package app import ( "errors" acl "acl.orly" "encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/closedenvelope" "encoders.orly/envelopes/eoseenvelope" "encoders.orly/envelopes/eventenvelope" "encoders.orly/envelopes/okenvelope" "encoders.orly/envelopes/reqenvelope" "encoders.orly/event" "encoders.orly/filter" "encoders.orly/reason" "encoders.orly/tag" "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" "lol.mleku.dev/log" "utils.orly/normalize" "utils.orly/pointers" ) func (l *Listener) HandleReq(msg []byte) ( err error, ) { var rem []byte env := reqenvelope.New() if rem, err = env.Unmarshal(msg); chk.E(err) { return normalize.Error.Errorf(err.Error()) } if len(rem) > 0 { log.I.F("extra '%s'", rem) } // // send a challenge to the client to auth if an ACL is active and not authed // if acl.Registry.Active.Load() != "none" && l.authedPubkey.Load() == nil { // log.D.F("sending challenge to %s", l.remote) // if err = authenvelope.NewChallengeWith(l.challenge.Load()). // Write(l); chk.E(err) { // // return // } // log.D.F("sending CLOSED to %s", l.remote) // if err = closedenvelope.NewFrom( // env.Subscription, reason.AuthRequired.F("auth required for access"), // ).Write(l); chk.E(err) { // return // } // // ACL is enabled so return and wait for auth // // return // } // send a challenge to the client to auth if an ACL is active if acl.Registry.Active.Load() != "none" { // log.D.F("sending CLOSED to %s", l.remote) // if err = closedenvelope.NewFrom( // env.Subscription, reason.AuthRequired.F("auth required for access"), // ).Write(l); chk.E(err) { // // return // } if err = authenvelope.NewChallengeWith(l.challenge.Load()). Write(l); chk.E(err) { // return } } // check permissions of user accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load()) switch accessLevel { case "none": if err = okenvelope.NewFrom( env.Subscription, false, reason.AuthRequired.F("user not authed or has no read access"), ).Write(l); chk.E(err) { return } return default: // user has read access or better, continue log.D.F("user has %s access", accessLevel) } var events event.S for _, f := range *env.Filters { if pointers.Present(f.Limit) { if *f.Limit == 0 { continue } } if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) { if errors.Is(err, badger.ErrDBClosed) { return } err = nil } } // write out the events to the socket seen := make(map[string]struct{}) for _, ev := range events { // track the IDs we've sent seen[string(ev.ID)] = struct{}{} var res *eventenvelope.Result if res, err = eventenvelope.NewResultWith( env.Subscription, ev, ); chk.E(err) { return } if err = res.Write(l); chk.E(err) { return } } // write the EOSE to signal to the client that all events found have been // sent. log.T.F("sending EOSE to %s", l.remote) if err = eoseenvelope.NewFrom(env.Subscription). Write(l); chk.E(err) { return } // if the query was for just Ids, we know there can't be any more results, // so cancel the subscription. cancel := true var subbedFilters filter.S for _, f := range *env.Filters { if f.Ids.Len() < 1 { cancel = false subbedFilters = append(subbedFilters, f) } else { // remove the IDs that we already sent var notFounds [][]byte for _, ev := range events { if _, ok := seen[string(ev.ID)]; ok { continue } notFounds = append(notFounds, ev.ID) } // if all were found, don't add to subbedFilters if len(notFounds) == 0 { continue } // rewrite the filter Ids to remove the ones we already sent f.Ids = tag.NewFromBytesSlice(notFounds...) // add the filter to the list of filters we're subscribing to subbedFilters = append(subbedFilters, f) } // also, if we received the limit number of events, subscription ded if pointers.Present(f.Limit) { if len(events) < int(*f.Limit) { cancel = false } } } receiver := make(event.C, 32) // if the subscription should be cancelled, do so if !cancel { l.publishers.Receive( &W{ Conn: l.conn, remote: l.remote, Id: string(env.Subscription), Receiver: receiver, Filters: env.Filters, }, ) } else { if err = closedenvelope.NewFrom( env.Subscription, nil, ).Write(l); chk.E(err) { return } } return }