Compare commits

...

6 Commits

14 changed files with 320 additions and 57 deletions

View File

@@ -14,6 +14,11 @@ func (l *Listener) HandleAuth(b []byte) (err error) {
if rem, err = env.Unmarshal(b); chk.E(err) { if rem, err = env.Unmarshal(b); chk.E(err) {
return return
} }
defer func() {
if env != nil && env.Event != nil {
env.Event.Free()
}
}()
if len(rem) > 0 { if len(rem) > 0 {
log.I.F("extra '%s'", rem) log.I.F("extra '%s'", rem)
} }

View File

@@ -21,6 +21,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
if msg, err = env.Unmarshal(msg); chk.E(err) { if msg, err = env.Unmarshal(msg); chk.E(err) {
return return
} }
defer func() {
if env != nil && env.E != nil {
env.E.Free()
}
}()
if len(msg) > 0 { if len(msg) > 0 {
log.I.F("extra '%s'", msg) log.I.F("extra '%s'", msg)
} }
@@ -56,21 +61,13 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
} }
return return
} }
// // send a challenge to the client to auth if an ACL is active and not authed
// if acl.Registry.Active.Load() != "none" && l.authedPubkey.Load() == nil {
// log.D.F("sending challenge to %s", l.remote)
// if err = authenvelope.NewChallengeWith(l.challenge.Load()).
// Write(l); chk.E(err) {
// // return
// }
// // ACL is enabled so return and wait for auth
// // return
// }
// check permissions of user // check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load()) accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load())
switch accessLevel { switch accessLevel {
case "none": case "none":
log.D.F("handle event: sending CLOSED to %s", l.remote) log.D.F(
"handle event: sending 'OK,false,auth-required...' to %s", l.remote,
)
if err = okenvelope.NewFrom( if err = okenvelope.NewFrom(
env.Id(), false, env.Id(), false,
reason.AuthRequired.F("auth required for write access"), reason.AuthRequired.F("auth required for write access"),
@@ -84,17 +81,20 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
} }
return return
case "read": case "read":
log.D.F("handle event: sending CLOSED to %s", l.remote) log.D.F(
"handle event: sending 'OK,false,auth-required:...' to %s",
l.remote,
)
if err = okenvelope.NewFrom( if err = okenvelope.NewFrom(
env.Id(), false, env.Id(), false,
reason.AuthRequired.F("auth required for write access"), reason.AuthRequired.F("auth required for write access"),
).Write(l); chk.E(err) { ).Write(l); chk.E(err) {
// return return
} }
log.D.F("handle event: sending challenge to %s", l.remote) log.D.F("handle event: sending challenge to %s", l.remote)
if err = authenvelope.NewChallengeWith(l.challenge.Load()). if err = authenvelope.NewChallengeWith(l.challenge.Load()).
Write(l); chk.E(err) { Write(l); chk.E(err) {
// return return
} }
return return
default: default:

View File

@@ -12,11 +12,14 @@ import (
"encoders.orly/envelopes/reqenvelope" "encoders.orly/envelopes/reqenvelope"
"encoders.orly/event" "encoders.orly/event"
"encoders.orly/filter" "encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind"
"encoders.orly/reason" "encoders.orly/reason"
"encoders.orly/tag" "encoders.orly/tag"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log" "lol.mleku.dev/log"
utils "utils.orly"
"utils.orly/normalize" "utils.orly/normalize"
"utils.orly/pointers" "utils.orly/pointers"
) )
@@ -32,33 +35,11 @@ func (l *Listener) HandleReq(msg []byte) (
if len(rem) > 0 { if len(rem) > 0 {
log.I.F("extra '%s'", rem) log.I.F("extra '%s'", rem)
} }
// // send a challenge to the client to auth if an ACL is active and not authed
// if acl.Registry.Active.Load() != "none" && l.authedPubkey.Load() == nil {
// log.D.F("sending challenge to %s", l.remote)
// if err = authenvelope.NewChallengeWith(l.challenge.Load()).
// Write(l); chk.E(err) {
// // return
// }
// log.D.F("sending CLOSED to %s", l.remote)
// if err = closedenvelope.NewFrom(
// env.Subscription, reason.AuthRequired.F("auth required for access"),
// ).Write(l); chk.E(err) {
// return
// }
// // ACL is enabled so return and wait for auth
// // return
// }
// send a challenge to the client to auth if an ACL is active // send a challenge to the client to auth if an ACL is active
if acl.Registry.Active.Load() != "none" { if acl.Registry.Active.Load() != "none" {
// log.D.F("sending CLOSED to %s", l.remote)
// if err = closedenvelope.NewFrom(
// env.Subscription, reason.AuthRequired.F("auth required for access"),
// ).Write(l); chk.E(err) {
// // return
// }
if err = authenvelope.NewChallengeWith(l.challenge.Load()). if err = authenvelope.NewChallengeWith(l.challenge.Load()).
Write(l); chk.E(err) { Write(l); chk.E(err) {
// return return
} }
} }
// check permissions of user // check permissions of user
@@ -90,7 +71,48 @@ func (l *Listener) HandleReq(msg []byte) (
err = nil err = nil
} }
} }
// write out the events to the socket var tmp event.S
privCheck:
for _, ev := range events {
if kind.IsPrivileged(ev.Kind) &&
accessLevel != "admin" { // admins can see all events
log.I.F("checking privileged event %s", ev.ID)
pk := l.authedPubkey.Load()
if pk == nil {
continue
}
if utils.FastEqual(ev.Pubkey, pk) {
log.I.F(
"privileged event %s is for logged in pubkey %0x", ev.ID,
pk,
)
tmp = append(tmp, ev)
continue
}
pTags := ev.Tags.GetAll([]byte("p"))
for _, pTag := range pTags {
var pt []byte
if pt, err = hex.Dec(string(pTag.Value())); chk.E(err) {
continue
}
if utils.FastEqual(pt, pk) {
log.I.F(
"privileged event %s is for logged in pubkey %0x",
ev.ID, pk,
)
tmp = append(tmp, ev)
continue privCheck
}
}
log.W.F(
"privileged event %s does not contain the logged in pubkey %0x",
ev.ID, pk,
)
} else {
tmp = append(tmp, ev)
}
}
events = tmp
seen := make(map[string]struct{}) seen := make(map[string]struct{})
for _, ev := range events { for _, ev := range events {
// track the IDs we've sent // track the IDs we've sent

View File

@@ -34,7 +34,6 @@ func Run(
} }
adminKeys = append(adminKeys, pk) adminKeys = append(adminKeys, pk)
} }
// start listener // start listener
l := &Server{ l := &Server{
Ctx: ctx, Ctx: ctx,

View File

@@ -30,9 +30,10 @@ func main() {
os.Exit(1) os.Exit(1)
} }
acl.Registry.Active.Store(cfg.ACLMode) acl.Registry.Active.Store(cfg.ACLMode)
if err = acl.Registry.Configure(cfg, db); chk.E(err) { if err = acl.Registry.Configure(cfg, db, ctx); chk.E(err) {
os.Exit(1) os.Exit(1)
} }
acl.Registry.Syncer()
quit := app.Run(ctx, cfg, db) quit := app.Run(ctx, cfg, db)
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)

View File

@@ -48,6 +48,15 @@ func (s *S) GetACLInfo() (name, description, documentation string) {
return return
} }
func (s *S) Syncer() {
for _, i := range s.ACL {
if i.Type() == s.Active.Load() {
i.Syncer()
break
}
}
}
func (s *S) Type() (typ string) { func (s *S) Type() (typ string) {
for _, i := range s.ACL { for _, i := range s.ACL {
if i.Type() == s.Active.Load() { if i.Type() == s.Active.Load() {

View File

@@ -1,30 +1,43 @@
package acl package acl
import ( import (
"context"
"reflect" "reflect"
"strings"
"sync" "sync"
"time"
database "database.orly" database "database.orly"
"database.orly/indexes/types" "database.orly/indexes/types"
"encoders.orly/bech32encoding" "encoders.orly/bech32encoding"
"encoders.orly/envelopes"
"encoders.orly/envelopes/eoseenvelope"
"encoders.orly/envelopes/eventenvelope"
"encoders.orly/envelopes/reqenvelope"
"encoders.orly/event" "encoders.orly/event"
"encoders.orly/filter" "encoders.orly/filter"
"encoders.orly/hex" "encoders.orly/hex"
"encoders.orly/kind" "encoders.orly/kind"
"encoders.orly/tag" "encoders.orly/tag"
"github.com/coder/websocket"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/errorf" "lol.mleku.dev/errorf"
"lol.mleku.dev/log" "lol.mleku.dev/log"
"next.orly.dev/app/config" "next.orly.dev/app/config"
utils "utils.orly" utils "utils.orly"
"utils.orly/normalize"
"utils.orly/values"
) )
type Follows struct { type Follows struct {
Ctx context.Context
cfg *config.C cfg *config.C
*database.D *database.D
followsMx sync.RWMutex followsMx sync.RWMutex
admins [][]byte admins [][]byte
follows [][]byte follows [][]byte
updated chan struct{}
subsCancel context.CancelFunc
} }
func (f *Follows) Configure(cfg ...any) (err error) { func (f *Follows) Configure(cfg ...any) (err error) {
@@ -37,6 +50,9 @@ func (f *Follows) Configure(cfg ...any) (err error) {
case *database.D: case *database.D:
log.D.F("setting ACL database: %s", c.Path()) log.D.F("setting ACL database: %s", c.Path())
f.D = c f.D = c
case context.Context:
log.D.F("setting ACL context: %s", c.Value("id"))
f.Ctx = c
default: default:
err = errorf.E("invalid type: %T", reflect.TypeOf(ca)) err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
} }
@@ -92,6 +108,11 @@ func (f *Follows) Configure(cfg ...any) (err error) {
} }
} }
} }
if f.updated == nil {
f.updated = make(chan struct{})
} else {
f.updated <- struct{}{}
}
return return
} }
@@ -121,6 +142,199 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) {
func (f *Follows) Type() string { return "follows" } func (f *Follows) Type() string { return "follows" }
func (f *Follows) adminRelays() (urls []string) {
f.followsMx.RLock()
admins := make([][]byte, len(f.admins))
copy(admins, f.admins)
f.followsMx.RUnlock()
seen := make(map[string]struct{})
for _, adm := range admins {
fl := &filter.F{
Authors: tag.NewFromAny(adm),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
}
idxs, err := database.GetIndexesFromFilter(fl)
if chk.E(err) {
continue
}
var sers types.Uint40s
for _, idx := range idxs {
s, err := f.D.GetSerialsByRange(idx)
if chk.E(err) {
continue
}
sers = append(sers, s...)
}
for _, s := range sers {
ev, err := f.D.FetchEventBySerial(s)
if chk.E(err) || ev == nil {
continue
}
for _, v := range ev.Tags.GetAll([]byte("r")) {
u := string(v.Value())
n := string(normalize.URL(u))
if n == "" {
continue
}
if _, ok := seen[n]; ok {
continue
}
seen[n] = struct{}{}
urls = append(urls, n)
}
}
}
return
}
func (f *Follows) startSubscriptions(ctx context.Context) {
// build authors list: admins + follows
f.followsMx.RLock()
authors := make([][]byte, 0, len(f.admins)+len(f.follows))
authors = append(authors, f.admins...)
authors = append(authors, f.follows...)
f.followsMx.RUnlock()
if len(authors) == 0 {
log.W.F("follows syncer: no authors (admins+follows) to subscribe to")
return
}
urls := f.adminRelays()
if len(urls) == 0 {
log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
return
}
log.I.F(
"follows syncer: subscribing to %d relays for %d authors", len(urls),
len(authors),
)
for _, u := range urls {
u := u
go func() {
backoff := time.Second
for {
select {
case <-ctx.Done():
return
default:
}
c, _, err := websocket.Dial(ctx, u, nil)
if err != nil {
log.W.F("follows syncer: dial %s failed: %v", u, err)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
return
case <-timer.C:
}
if backoff < 30*time.Second {
backoff *= 2
}
continue
}
backoff = time.Second
// send REQ
ff := &filter.S{}
f1 := &filter.F{
Authors: tag.NewFromBytesSlice(authors...),
Limit: values.ToUintPointer(0),
}
*ff = append(*ff, f1)
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
if err := c.Write(
ctx, websocket.MessageText, req.Marshal(nil),
); chk.E(err) {
_ = c.Close(websocket.StatusInternalError, "write failed")
continue
}
log.I.F("sent REQ to %s for follows subscription", u)
// read loop
for {
select {
case <-ctx.Done():
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
return
default:
}
_, data, err := c.Read(ctx)
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
break
}
label, rem, err := envelopes.Identify(data)
if chk.E(err) {
continue
}
switch label {
case eventenvelope.L:
res, _, err := eventenvelope.ParseResult(rem)
if chk.E(err) || res == nil || res.Event == nil {
continue
}
// verify signature before saving
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
continue
}
if _, _, err := f.D.SaveEvent(
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "event already exists",
) {
log.W.F(
"follows syncer: save event failed: %v",
err,
)
}
// ignore duplicates and continue
}
log.I.F(
"saved new event from follows syncer: %0x",
res.Event.ID,
)
case eoseenvelope.L:
// ignore, continue subscription
default:
// ignore other labels
}
}
// loop reconnect
}
}()
}
}
func (f *Follows) Syncer() {
log.I.F("starting follows syncer")
go func() {
// start immediately if Configure already ran
for {
var innerCancel context.CancelFunc
select {
case <-f.Ctx.Done():
if f.subsCancel != nil {
f.subsCancel()
}
return
case <-f.updated:
// close and reopen subscriptions to users on the follow list and admins
if f.subsCancel != nil {
log.I.F("follows syncer: cancelling existing subscriptions")
f.subsCancel()
}
ctx, cancel := context.WithCancel(f.Ctx)
f.subsCancel = cancel
innerCancel = cancel
log.I.F("follows syncer: (re)opening subscriptions")
f.startSubscriptions(ctx)
}
// small sleep to avoid tight loop if updated fires rapidly
if innerCancel == nil {
time.Sleep(50 * time.Millisecond)
}
}
}()
}
func init() { func init() {
log.T.F("registering follows ACL") log.T.F("registering follows ACL")
Registry.Register(new(Follows)) Registry.Register(new(Follows))

View File

@@ -20,6 +20,8 @@ func (n None) Type() string {
return "none" return "none"
} }
func (n None) Syncer() {}
func init() { func init() {
log.T.F("registering none ACL") log.T.F("registering none ACL")
Registry.Register(new(None)) Registry.Register(new(None))

View File

@@ -49,6 +49,7 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
if _, err = w.Write([]byte{'\n'}); chk.E(err) { if _, err = w.Write([]byte{'\n'}); chk.E(err) {
return return
} }
ev.Free()
evBuf.Reset() evBuf.Reset()
} }
return return
@@ -86,14 +87,15 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
if err = ev.UnmarshalBinary(evBuf); chk.E(err) { if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
continue continue
} }
// Serialize the event to JSON and write it to the output // Serialize the event to JSON and write it to the output
if _, err = w.Write(ev.Serialize()); chk.E(err) { if _, err = w.Write(ev.Serialize()); chk.E(err) {
continue continue
} }
if _, err = w.Write([]byte{'\n'}); chk.E(err) { if _, err = w.Write([]byte{'\n'}); chk.E(err) {
continue continue
} }
evBuf.Reset() ev.Free()
evBuf.Reset()
} }
return return
}, },

View File

@@ -52,17 +52,22 @@ func (d *D) Import(rr io.Reader) {
continue continue
} }
ev := &event.E{} ev := event.New()
if _, err = ev.Unmarshal(b); err != nil { if _, err = ev.Unmarshal(b); err != nil {
// return the pooled buffer on error
ev.Free()
continue continue
} }
if _, _, err = d.SaveEvent(d.ctx, ev); err != nil { if _, _, err = d.SaveEvent(d.ctx, ev); err != nil {
// return the pooled buffer on error paths too
ev.Free()
continue continue
} }
// return the pooled buffer after successful save
ev.Free()
b = nil b = nil
ev = nil
count++ count++
if count%100 == 0 { if count%100 == 0 {
log.I.F("received %d events", count) log.I.F("received %d events", count)

View File

@@ -78,9 +78,9 @@ var Privileged = []*K{
// IsPrivileged returns true if the type is the kind of message nobody else than // IsPrivileged returns true if the type is the kind of message nobody else than
// the pubkeys in the event and p tags of the event are party to. // the pubkeys in the event and p tags of the event are party to.
func (k *K) IsPrivileged() (is bool) { func IsPrivileged(k uint16) (is bool) {
for i := range Privileged { for i := range Privileged {
if k.Equal(Privileged[i].K) { if k == Privileged[i].K {
return true return true
} }
} }

View File

@@ -142,7 +142,7 @@ func (k *S) Unmarshal(b []byte) (r []byte, err error) {
// be privacy protected). // be privacy protected).
func (k *S) IsPrivileged() (priv bool) { func (k *S) IsPrivileged() (priv bool) {
for i := range k.K { for i := range k.K {
if k.K[i].IsPrivileged() { if IsPrivileged(k.K[i].K) {
return true return true
} }
} }

View File

@@ -27,5 +27,9 @@ type I interface {
// explain briefly how it works, and then a long text of documentation of // explain briefly how it works, and then a long text of documentation of
// the ACL's rules and configuration (in asciidoc or markdown). // the ACL's rules and configuration (in asciidoc or markdown).
GetACLInfo() (name, description, documentation string) GetACLInfo() (name, description, documentation string)
// Syncer is a worker thread that does things in the background like syncing
// with other relays on admin relay lists using subscriptions for all events
// that arrive elsewhere relevant to the ACL scheme.
Syncer()
typer.T typer.T
} }

View File

@@ -1 +1 @@
v0.1.0 v0.2.0