Compare commits

..

8 Commits

Author SHA1 Message Date
1d9a6903b8 bump version
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-14 12:18:01 +00:00
29e175efb0 implement event table subtyping for small events in value log
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-14 12:15:52 +00:00
7169a2158f when in "none" ACL mode, privileged checks are not enforced
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-13 08:31:02 +00:00
baede6d37f extend script test to two read two write to ensure script continues running
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-11 15:24:58 +00:00
3e7cc01d27 make script stderr print into relay logs
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-11 14:41:54 +00:00
cc99fcfab5 bump to v0.27.5
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-11 14:38:05 +00:00
b2056b6636 bump to v0.27.5
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-11-11 13:48:23 +00:00
108cbdce93 fix docker image cleanups in test 2025-11-11 13:47:57 +00:00
23 changed files with 3036 additions and 139 deletions

View File

@@ -46,7 +46,9 @@
"Bash(git rm:*)",
"Bash(git add:*)",
"Bash(./test-policy.sh:*)",
"Bash(docker rm:*)"
"Bash(docker rm:*)",
"Bash(./scripts/docker-policy/test-policy.sh:*)",
"Bash(./policytest:*)"
],
"deny": [],
"ask": []

View File

@@ -661,6 +661,8 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
l.subscriptionsMu.Unlock()
// Register subscription with publisher
// Set AuthRequired based on ACL mode - when ACL is "none", don't require auth for privileged events
authRequired := acl.Registry.Active.Load() != "none"
l.publishers.Receive(
&W{
Conn: l.conn,
@@ -669,6 +671,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
Receiver: receiver,
Filters: &subbedFilters,
AuthedPubkey: l.authedPubkey.Load(),
AuthRequired: authRequired,
},
)

View File

@@ -1,13 +1,12 @@
package app
import (
"next.orly.dev/pkg/interfaces/signer/p8k"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"next.orly.dev/pkg/interfaces/signer/p8k"
"os"
"path/filepath"
"testing"
"time"
@@ -75,13 +74,15 @@ func setupE2ETest(t *testing.T) (*Server, *httptest.Server, func()) {
server.mux = http.NewServeMux()
// Set up HTTP handlers
server.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Accept") == "application/nostr+json" {
server.HandleRelayInfo(w, r)
return
}
http.NotFound(w, r)
})
server.mux.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Accept") == "application/nostr+json" {
server.HandleRelayInfo(w, r)
return
}
http.NotFound(w, r)
},
)
httpServer := httptest.NewServer(server.mux)
@@ -133,7 +134,10 @@ func TestE2E_RelayInfoIncludesNIP43(t *testing.T) {
// Verify server name
if info.Name != server.Config.AppName {
t.Errorf("wrong relay name: got %s, want %s", info.Name, server.Config.AppName)
t.Errorf(
"wrong relay name: got %s, want %s", info.Name,
server.Config.AppName,
)
}
}
@@ -205,7 +209,10 @@ func TestE2E_CompleteJoinFlow(t *testing.T) {
t.Fatalf("failed to get membership: %v", err)
}
if membership.InviteCode != inviteCode {
t.Errorf("wrong invite code: got %s, want %s", membership.InviteCode, inviteCode)
t.Errorf(
"wrong invite code: got %s, want %s", membership.InviteCode,
inviteCode,
)
}
}
@@ -355,6 +362,9 @@ func TestE2E_ExpiredInviteCode(t *testing.T) {
}
defer os.RemoveAll(tempDir)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db, err := database.New(ctx, cancel, tempDir, "info")
if err != nil {
t.Fatalf("failed to open database: %v", err)
@@ -366,8 +376,6 @@ func TestE2E_ExpiredInviteCode(t *testing.T) {
NIP43InviteExpiry: 1 * time.Millisecond, // Very short expiry
}
ctx := context.Background()
server := &Server{
Ctx: ctx,
Config: cfg,
@@ -498,7 +506,10 @@ func BenchmarkJoinRequestProcessing(b *testing.B) {
}
defer os.RemoveAll(tempDir)
db, err := database.Open(filepath.Join(tempDir, "test.db"), "error")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db, err := database.New(ctx, cancel, tempDir, "error")
if err != nil {
b.Fatalf("failed to open database: %v", err)
}
@@ -509,8 +520,6 @@ func BenchmarkJoinRequestProcessing(b *testing.B) {
NIP43InviteExpiry: 24 * time.Hour,
}
ctx := context.Background()
server := &Server{
Ctx: ctx,
Config: cfg,

View File

@@ -28,6 +28,7 @@ type Subscription struct {
remote string
AuthedPubkey []byte
Receiver event.C // Channel for delivering events to this subscription
AuthRequired bool // Whether ACL requires authentication for privileged events
*filter.S
}
@@ -58,6 +59,11 @@ type W struct {
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
AuthedPubkey []byte
// AuthRequired indicates whether the ACL in operation requires auth. If
// this is set to true, the publisher will not publish privileged or other
// restricted events to non-authed listeners, otherwise, it will.
AuthRequired bool
}
func (w *W) Type() (typeName string) { return Type }
@@ -87,7 +93,6 @@ func NewPublisher(c context.Context) (publisher *P) {
func (p *P) Type() (typeName string) { return Type }
// Receive handles incoming messages to manage websocket listener subscriptions
// and associated filters.
//
@@ -120,12 +125,14 @@ func (p *P) Receive(msg typer.T) {
if subs, ok := p.Map[m.Conn]; !ok {
subs = make(map[string]Subscription)
subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver,
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
Receiver: m.Receiver, AuthRequired: m.AuthRequired,
}
p.Map[m.Conn] = subs
} else {
subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver,
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
Receiver: m.Receiver, AuthRequired: m.AuthRequired,
}
}
}
@@ -174,11 +181,14 @@ func (p *P) Deliver(ev *event.E) {
for _, d := range deliveries {
// If the event is privileged, enforce that the subscriber's authed pubkey matches
// either the event pubkey or appears in any 'p' tag of the event.
if kind.IsPrivileged(ev.Kind) {
// Only check authentication if AuthRequired is true (ACL is active)
if kind.IsPrivileged(ev.Kind) && d.sub.AuthRequired {
if len(d.sub.AuthedPubkey) == 0 {
// Not authenticated - cannot see privileged events
log.D.F("subscription delivery DENIED for privileged event %s to %s (not authenticated)",
hex.Enc(ev.ID), d.sub.remote)
log.D.F(
"subscription delivery DENIED for privileged event %s to %s (not authenticated)",
hex.Enc(ev.ID), d.sub.remote,
)
continue
}
@@ -201,8 +211,10 @@ func (p *P) Deliver(ev *event.E) {
}
}
if !allowed {
log.D.F("subscription delivery DENIED for privileged event %s to %s (auth mismatch)",
hex.Enc(ev.ID), d.sub.remote)
log.D.F(
"subscription delivery DENIED for privileged event %s to %s (auth mismatch)",
hex.Enc(ev.ID), d.sub.remote,
)
// Skip delivery for this subscriber
continue
}
@@ -225,26 +237,37 @@ func (p *P) Deliver(ev *event.E) {
}
if hasPrivateTag {
canSeePrivate := p.canSeePrivateEvent(d.sub.AuthedPubkey, privatePubkey, d.sub.remote)
canSeePrivate := p.canSeePrivateEvent(
d.sub.AuthedPubkey, privatePubkey, d.sub.remote,
)
if !canSeePrivate {
log.D.F("subscription delivery DENIED for private event %s to %s (unauthorized)",
hex.Enc(ev.ID), d.sub.remote)
log.D.F(
"subscription delivery DENIED for private event %s to %s (unauthorized)",
hex.Enc(ev.ID), d.sub.remote,
)
continue
}
log.D.F("subscription delivery ALLOWED for private event %s to %s (authorized)",
hex.Enc(ev.ID), d.sub.remote)
log.D.F(
"subscription delivery ALLOWED for private event %s to %s (authorized)",
hex.Enc(ev.ID), d.sub.remote,
)
}
}
// Send event to the subscription's receiver channel
// The consumer goroutine (in handle-req.go) will read from this channel
// and forward it to the client via the write channel
log.D.F("attempting delivery of event %s (kind=%d) to subscription %s @ %s",
hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote)
log.D.F(
"attempting delivery of event %s (kind=%d) to subscription %s @ %s",
hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote,
)
// Check if receiver channel exists
if d.sub.Receiver == nil {
log.E.F("subscription %s has nil receiver channel for %s", d.id, d.sub.remote)
log.E.F(
"subscription %s has nil receiver channel for %s", d.id,
d.sub.remote,
)
continue
}
@@ -253,11 +276,15 @@ func (p *P) Deliver(ev *event.E) {
case <-p.c.Done():
continue
case d.sub.Receiver <- ev:
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s",
hex.Enc(ev.ID), d.sub.remote, d.id)
log.D.F(
"subscription delivery QUEUED: event=%s to=%s sub=%s",
hex.Enc(ev.ID), d.sub.remote, d.id,
)
case <-time.After(DefaultWriteTimeout):
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s",
hex.Enc(ev.ID), d.sub.remote, d.id)
log.E.F(
"subscription delivery TIMEOUT: event=%s to=%s sub=%s",
hex.Enc(ev.ID), d.sub.remote, d.id,
)
// Receiver channel is full - subscription consumer is stuck or slow
// The subscription should be removed by the cleanup logic
}
@@ -285,7 +312,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
// SetWriteChan stores the write channel for a websocket connection
// If writeChan is nil, the entry is removed from the map
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteRequest) {
func (p *P) SetWriteChan(
conn *websocket.Conn, writeChan chan publish.WriteRequest,
) {
p.Mx.Lock()
defer p.Mx.Unlock()
if writeChan == nil {
@@ -296,7 +325,9 @@ func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteReque
}
// GetWriteChan returns the write channel for a websocket connection
func (p *P) GetWriteChan(conn *websocket.Conn) (chan publish.WriteRequest, bool) {
func (p *P) GetWriteChan(conn *websocket.Conn) (
chan publish.WriteRequest, bool,
) {
p.Mx.RLock()
defer p.Mx.RUnlock()
ch, ok := p.WriteChans[conn]
@@ -313,7 +344,9 @@ func (p *P) removeSubscriber(ws *websocket.Conn) {
}
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag
func (p *P) canSeePrivateEvent(authedPubkey, privatePubkey []byte, remote string) (canSee bool) {
func (p *P) canSeePrivateEvent(
authedPubkey, privatePubkey []byte, remote string,
) (canSee bool) {
// If no authenticated user, deny access
if len(authedPubkey) == 0 {
return false

View File

@@ -1 +1,17 @@
test
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>ORLY?</title>
<link rel="icon" type="image/png" href="/favicon.png" />
<link rel="stylesheet" href="/global.css" />
<link rel="stylesheet" href="/bundle.css" />
<script defer src="/bundle.js"></script>
</head>
<body></body>
</html>

View File

@@ -8,20 +8,24 @@ import (
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/interfaces/signer/p8k"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/interfaces/signer/p8k"
"next.orly.dev/pkg/protocol/ws"
)
func main() {
var err error
url := flag.String("url", "ws://127.0.0.1:3334", "relay websocket URL")
timeout := flag.Duration("timeout", 20*time.Second, "publish timeout")
timeout := flag.Duration("timeout", 20*time.Second, "operation timeout")
testType := flag.String("type", "event", "test type: 'event' for write control, 'req' for read control, 'both' for both, 'publish-and-query' for full test")
eventKind := flag.Int("kind", 4678, "event kind to test")
numEvents := flag.Int("count", 2, "number of events to publish (for publish-and-query)")
flag.Parse()
// Minimal client that publishes a single kind 4678 event and reports OK/err
// Connect to relay
var rl *ws.Client
if rl, err = ws.RelayConnect(context.Background(), *url); chk.E(err) {
log.E.F("connect error: %v", err)
@@ -29,6 +33,7 @@ func main() {
}
defer rl.Close()
// Create signer
var signer *p8k.Signer
if signer, err = p8k.New(); chk.E(err) {
log.E.F("signer create error: %v", err)
@@ -39,26 +44,186 @@ func main() {
return
}
// Perform tests based on type
switch *testType {
case "event":
testEventWrite(rl, signer, *eventKind, *timeout)
case "req":
testReqRead(rl, signer, *eventKind, *timeout)
case "both":
log.I.Ln("Testing EVENT (write control)...")
testEventWrite(rl, signer, *eventKind, *timeout)
log.I.Ln("\nTesting REQ (read control)...")
testReqRead(rl, signer, *eventKind, *timeout)
case "publish-and-query":
testPublishAndQuery(rl, signer, *eventKind, *numEvents, *timeout)
default:
log.E.F("invalid test type: %s (must be 'event', 'req', 'both', or 'publish-and-query')", *testType)
}
}
func testEventWrite(rl *ws.Client, signer *p8k.Signer, eventKind int, timeout time.Duration) {
ev := &event.E{
CreatedAt: time.Now().Unix(),
Kind: kind.K{K: 4678}.K, // arbitrary custom kind
Kind: uint16(eventKind),
Tags: tag.NewS(),
Content: []byte("policy test: expect rejection"),
Content: []byte("policy test: expect rejection for write"),
}
if err = ev.Sign(signer); chk.E(err) {
if err := ev.Sign(signer); chk.E(err) {
log.E.F("sign error: %v", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err = rl.Publish(ctx, ev); err != nil {
if err := rl.Publish(ctx, ev); err != nil {
// Expected path if policy rejects: client returns error with reason (from OK false)
fmt.Println("policy reject:", err)
fmt.Println("EVENT policy reject:", err)
return
}
log.I.Ln("publish result: accepted")
fmt.Println("ACCEPT")
log.I.Ln("EVENT publish result: accepted")
fmt.Println("EVENT ACCEPT")
}
func testReqRead(rl *ws.Client, signer *p8k.Signer, eventKind int, timeout time.Duration) {
// First, publish a test event to the relay that we'll try to query
testEvent := &event.E{
CreatedAt: time.Now().Unix(),
Kind: uint16(eventKind),
Tags: tag.NewS(),
Content: []byte("policy test: event for read control test"),
}
if err := testEvent.Sign(signer); chk.E(err) {
log.E.F("sign error: %v", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Try to publish the test event first (ignore errors if policy rejects)
_ = rl.Publish(ctx, testEvent)
log.I.F("published test event kind %d for read testing", eventKind)
// Now try to query for events of this kind
limit := uint(10)
f := &filter.F{
Kinds: kind.FromIntSlice([]int{eventKind}),
Limit: &limit,
}
ctx2, cancel2 := context.WithTimeout(context.Background(), timeout)
defer cancel2()
events, err := rl.QuerySync(ctx2, f)
if chk.E(err) {
log.E.F("query error: %v", err)
fmt.Println("REQ query error:", err)
return
}
// Check if we got the expected events
if len(events) == 0 {
// Could mean policy filtered it out, or it wasn't stored
fmt.Println("REQ policy reject: no events returned (filtered by read policy)")
log.I.F("REQ result: no events of kind %d returned (policy filtered or not stored)", eventKind)
return
}
// Events were returned - read access allowed
fmt.Printf("REQ ACCEPT: %d events returned\n", len(events))
log.I.F("REQ result: %d events of kind %d returned", len(events), eventKind)
}
func testPublishAndQuery(rl *ws.Client, signer *p8k.Signer, eventKind int, numEvents int, timeout time.Duration) {
log.I.F("Publishing %d events of kind %d...", numEvents, eventKind)
publishedIDs := make([][]byte, 0, numEvents)
acceptedCount := 0
rejectedCount := 0
// Publish multiple events
for i := 0; i < numEvents; i++ {
ev := &event.E{
CreatedAt: time.Now().Unix() + int64(i), // Slightly different timestamps
Kind: uint16(eventKind),
Tags: tag.NewS(),
Content: []byte(fmt.Sprintf("policy test event %d/%d", i+1, numEvents)),
}
if err := ev.Sign(signer); chk.E(err) {
log.E.F("sign error for event %d: %v", i+1, err)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
err := rl.Publish(ctx, ev)
cancel()
if err != nil {
log.W.F("Event %d/%d rejected: %v", i+1, numEvents, err)
rejectedCount++
} else {
log.I.F("Event %d/%d published successfully (id: %x...)", i+1, numEvents, ev.ID[:8])
publishedIDs = append(publishedIDs, ev.ID)
acceptedCount++
}
}
fmt.Printf("PUBLISH: %d accepted, %d rejected out of %d total\n", acceptedCount, rejectedCount, numEvents)
if acceptedCount == 0 {
fmt.Println("No events were accepted, skipping query test")
return
}
// Wait a moment for events to be stored
time.Sleep(500 * time.Millisecond)
// Now query for events of this kind
log.I.F("Querying for events of kind %d...", eventKind)
limit := uint(100)
f := &filter.F{
Kinds: kind.FromIntSlice([]int{eventKind}),
Limit: &limit,
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
events, err := rl.QuerySync(ctx, f)
if chk.E(err) {
log.E.F("query error: %v", err)
fmt.Println("QUERY ERROR:", err)
return
}
log.I.F("Query returned %d events", len(events))
// Check if we got our published events back
foundCount := 0
for _, pubID := range publishedIDs {
found := false
for _, ev := range events {
if string(ev.ID) == string(pubID) {
found = true
break
}
}
if found {
foundCount++
}
}
fmt.Printf("QUERY: found %d/%d published events (total returned: %d)\n", foundCount, len(publishedIDs), len(events))
if foundCount == len(publishedIDs) {
fmt.Println("SUCCESS: All published events were retrieved")
} else if foundCount > 0 {
fmt.Printf("PARTIAL: Only %d/%d events retrieved (some filtered by read policy?)\n", foundCount, len(publishedIDs))
} else {
fmt.Println("FAILURE: None of the published events were retrieved (read policy blocked?)")
}
}

View File

@@ -361,6 +361,279 @@ Place scripts in a secure location and reference them in policy:
Ensure scripts are executable and have appropriate permissions.
### Script Requirements and Best Practices
#### Critical Requirements
**1. Output Only JSON to stdout**
Scripts MUST write ONLY JSON responses to stdout. Any other output (debug messages, logs, etc.) will break the JSONL protocol and cause errors.
**Debug Output**: Use stderr for debug messages - all stderr output from policy scripts is automatically logged to the relay log with the prefix `[policy script /path/to/script]`.
```javascript
// ❌ WRONG - This will cause "broken pipe" errors
console.log("Policy script starting..."); // This goes to stdout!
console.log(JSON.stringify(response)); // Correct
// ✅ CORRECT - Use stderr or file for debug output
console.error("Policy script starting..."); // This goes to stderr (appears in relay log)
fs.appendFileSync('/tmp/policy.log', 'Starting...\n'); // This goes to file (OK)
console.log(JSON.stringify(response)); // Stdout for JSON only
```
**2. Flush stdout After Each Response**
Always flush stdout after writing a response to ensure immediate delivery:
```python
# Python
print(json.dumps(response))
sys.stdout.flush() # Critical!
```
```javascript
// Node.js (usually automatic, but can be forced)
process.stdout.write(JSON.stringify(response) + '\n');
```
**3. Run as a Long-Lived Process**
Scripts should run continuously, reading from stdin in a loop. They should NOT:
- Exit after processing one event
- Use batch processing
- Close stdin/stdout prematurely
```javascript
// ✅ CORRECT - Long-lived process
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', (line) => {
const event = JSON.parse(line);
const response = processEvent(event);
console.log(JSON.stringify(response));
});
```
**4. Handle Errors Gracefully**
Always catch errors and return a valid JSON response:
```javascript
rl.on('line', (line) => {
try {
const event = JSON.parse(line);
const response = processEvent(event);
console.log(JSON.stringify(response));
} catch (err) {
// Log to stderr or file, not stdout!
console.error(`Error: ${err.message}`);
// Return reject response
console.log(JSON.stringify({
id: '',
action: 'reject',
msg: 'Policy script error'
}));
}
});
```
**5. Response Format**
Every response MUST include these fields:
```json
{
"id": "event_id", // Must match input event ID
"action": "accept", // Must be: accept, reject, or shadowReject
"msg": "" // Required (can be empty string)
}
```
#### Common Issues and Solutions
**Broken Pipe Error**
```
ERROR: policy script /path/to/script.js stdin closed (broken pipe)
```
**Causes:**
- Script exited prematurely
- Script wrote non-JSON output to stdout
- Script crashed or encountered an error
- Script closed stdin/stdout incorrectly
**Solutions:**
1. Remove ALL `console.log()` statements except JSON responses
2. Use `console.error()` or log files for debugging
3. Add error handling to catch and log exceptions
4. Ensure script runs continuously (doesn't exit)
**Response Timeout**
```
WARN: policy script /path/to/script.js response timeout - script may not be responding correctly
```
**Causes:**
- Script not flushing stdout
- Script processing taking > 5 seconds
- Script not responding to input
- Non-JSON output consuming a response slot
**Solutions:**
1. Add `sys.stdout.flush()` (Python) after each response
2. Optimize processing logic to be faster
3. Check that script is reading from stdin correctly
4. Remove debug output from stdout
**Invalid JSON Response**
```
ERROR: failed to parse policy response from /path/to/script.js
WARN: policy script produced non-JSON output on stdout: "Debug message"
```
**Solutions:**
1. Validate JSON before outputting
2. Use a JSON library, don't build strings manually
3. Move debug output to stderr or files
#### Testing Your Script
Before deploying, test your script:
```bash
# 1. Test basic functionality
echo '{"id":"test123","pubkey":"abc","kind":1,"content":"test","tags":[],"created_at":1234567890,"sig":"def"}' | node policy-script.js
# 2. Check for non-JSON output
echo '{"id":"test123","pubkey":"abc","kind":1,"content":"test","tags":[],"created_at":1234567890,"sig":"def"}' | node policy-script.js 2>/dev/null | jq .
# 3. Test error handling
echo 'invalid json' | node policy-script.js
```
Expected output (valid JSON only):
```json
{"id":"test123","action":"accept","msg":""}
```
#### Node.js Example (Complete)
```javascript
#!/usr/bin/env node
const readline = require('readline');
// Use stderr for debug logging - appears in relay log automatically
function debug(msg) {
console.error(`[policy] ${msg}`);
}
// Create readline interface
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
debug('Policy script started');
// Process each event
rl.on('line', (line) => {
try {
const event = JSON.parse(line);
debug(`Processing event ${event.id}, kind: ${event.kind}, access: ${event.access_type}`);
// Your policy logic here
const action = shouldAccept(event) ? 'accept' : 'reject';
if (action === 'reject') {
debug(`Rejected event ${event.id}: policy violation`);
}
// ONLY JSON to stdout
console.log(JSON.stringify({
id: event.id,
action: action,
msg: action === 'reject' ? 'Policy rejected' : ''
}));
} catch (err) {
debug(`Error: ${err.message}`);
// Still return valid JSON
console.log(JSON.stringify({
id: '',
action: 'reject',
msg: 'Policy script error'
}));
}
});
rl.on('close', () => {
debug('Policy script stopped');
});
function shouldAccept(event) {
// Your policy logic
if (event.content.toLowerCase().includes('spam')) {
return false;
}
// Different logic for read vs write
if (event.access_type === 'write') {
// Write control logic
return event.content.length < 10000;
} else if (event.access_type === 'read') {
// Read control logic
return true; // Allow all reads
}
return true;
}
```
**Relay Log Output Example:**
```
INFO [policy script /home/orly/.config/ORLY/policy.js] [policy] Policy script started
INFO [policy script /home/orly/.config/ORLY/policy.js] [policy] Processing event abc123, kind: 1, access: write
INFO [policy script /home/orly/.config/ORLY/policy.js] [policy] Processing event def456, kind: 1, access: read
```
#### Event Fields
Scripts receive additional context fields:
```json
{
"id": "event_id",
"pubkey": "author_pubkey",
"kind": 1,
"content": "Event content",
"tags": [],
"created_at": 1234567890,
"sig": "signature",
"logged_in_pubkey": "authenticated_user_pubkey",
"ip_address": "127.0.0.1",
"access_type": "read"
}
```
**access_type values:**
- `"write"`: Event is being stored (EVENT message)
- `"read"`: Event is being retrieved (REQ message)
Use this to implement different policies for reads vs writes.
## Policy Evaluation Order
Events are evaluated in this order:

View File

@@ -0,0 +1,187 @@
Reiser4 had *several* ideas that were too radical for Linux in the 2000s, but **would make a lot of sense today in a modern CoW (copy-on-write) filesystem**—especially one designed for immutable or content-addressed data.
Below is a distilled list of the Reiser4 concepts that *could* be successfully revived and integrated into a next-generation CoW filesystem, along with why they now make more sense and how they would fit.
---
# ✅ **1. Item/extent subtypes (structured metadata records)**
Reiser4 had “item types” that stored different structures within B-tree leaves (e.g., stat-data items, directory items, tail items).
Most filesystems today use coarse-grained extents and metadata blocks—but structured, typed leaf contents provide clear benefits:
### Why it makes sense today:
* CoW filesystems like **APFS**, **Btrfs**, and **ZFS** already have *typed nodes* internally (extent items, dir items).
* Typed leaf records allow:
* Faster parsing
* Future expansion of features
* Better layout for small objects
* Potential content-addressed leaves
A modern CoW filesystem could revive this idea by allowing different **record kinds** within leaf blocks, with stable, versioned formats.
---
# ✅ **2. Fine-grained small-file optimizations—but integrated with CoW**
Reiser4s small-file packing was too complicated for mutable trees, but in a CoW filesystem it fits perfectly:
### In CoW:
* Leaves are immutable once written.
* Small files can be stored **inline** inside a leaf, or as small extents.
* Deduplication is easier due to immutability.
* Crash consistency is automatic.
### What makes sense to revive:
* Tail-packing / inline-data for files below a threshold
* Possibly grouping many tiny files into a single CoW extent tree page
* Using a “small-files leaf type” with fixed slots
This aligns closely with APFSs and Btrfss inline extents but could go further—safely—because of CoW.
---
# ✅ **3. Semantic plugins *outside the kernel***
Reiser4s plugin system failed because it tried to put a framework *inside the kernel*.
But moving that logic **outside** (as user-space metadata layers or FUSE-like transforms) is realistic today.
### Possible modern implementation:
* A CoW filesystem exposes stable metadata + data primitives.
* User-space “semantic layers” do:
* per-directory views
* virtual inodes
* attribute-driven namespace merges
* versioned or content-addressed overlays
### Why it makes sense:
* User-space is safer and maintainers accept it.
* CoW makes such layers more reliable and more composable.
* Many systems already do this:
* OSTree
* Git virtual filesystem
* container overlayfs
* CephFS metadata layers
The spirit of Reiser4s semantics CAN live on—just not in-kernel.
---
# ✅ **4. Content-addressable objects + trees (Reiser4-like keys)**
Reiser4 had “keyed items” in a tree, which map closely to modern content-addressable storage strategies.
A modern CoW FS could:
* Store leaf blocks by **hash of contents**
* Use stable keyed addressing for trees
* Deduplicate at leaf granularity
* Provide Git/OSTree-style guarantees natively
This is very powerful for immutable or append-only workloads.
### Why it's feasible now:
* Fast hashing hardware
* Widespread use of snapshots, clones, dedupe
* Object-based designs in modern systems (e.g., bcachefs, ZFS)
Reiser4 was ahead of its time here.
---
# ✅ **5. Rich directory structures (hash trees)**
Reiser4s directory semantics were much more flexible, including:
* Extensible directory entries
* Small-directory embedding
* Very fast operations on large directories
Most CoW FSes today use coarse directory structures.
A modern CoW FS could adopt:
* Fixed-format hashed directories for fast lookup
* Optional richer metadata per entry
* Inline storage of tiny directories
Essentially, a more flexible but POSIX-compliant version of Reiser4 directories.
---
# ✅ **6. Atomic multi-item updates via “transaction items”**
Reiser4 had advanced concepts for batched updates via a plugin model, which could be simplified into:
* A single CoW commit representing a set of operations
* Versioned writes to multiple trees
This is similar to what APFS and Btrfs do, but can be made more explicit.
### Why its relevant
Modern workloads (containers, datasets, package managers) rely heavily on atomic snapshots—rich commit semantics at the filesystem layer are a big win.
---
# 🧪 **7. Advanced multi-key indexing (“dancing tree” ideas, but simplified)**
Reiser4 used a flexible key scheme for ordering items in the unified tree.
While we dont want "dancing trees" again, a **multi-dimensional key tuple** is extremely useful for:
* Querying by filename + offset
* Efficiently supporting both sparse files and directories
* Custom ordering schemes without rebalancing everything
A CoW filesystem can use simple B-trees or B+trees with structured keys.
---
# 🚫 Reiser4 ideas that *do NOT* make sense even in a CoW filesystem
To be clear, these should **not** be revived:
* In-kernel plugin architecture
* Files-as-directories breaking POSIX
* Dynamic per-file semantic behavior
* Arbitrary user-defined item types in kernel
* Complex item-level transformations at write time
* Dancing-tree hyper-rebalancing policies
These remain too brittle or too unpredictable for a production CoW filesystem.
---
# 🧠 **Summary: What Reiser4 ideas *should* come back in future CoW designs**
### ✔ Good ideas to revive:
1. **Record/item subtypes within leaf nodes** for efficiency
2. **Advanced small-file packing that works with CoW**
3. **Semantic “plugins” in user-space only**
4. **Content-addressable storage for leaves/objects**
5. **Rich, hashed, fast directories**
6. **Explicit atomic multi-tree commit semantics**
7. **Structured multi-field keys inside B-trees**
### ✘ Not worth reviving:
* Kernel plugin system
* Semantics-heavy namespaces
* Dancing trees
* Micro-level tree balancing complexity
* Deeply nested “item-in-item-in-item” structures
* Anything that breaks POSIX semantics
---
If you want, I can sketch out a **concept design** for a hypothetical “Reiser5-like CoW filesystem” that incorporates the good parts but avoids the mistakes—something that could realistically compete with APFS, ZFS, and bcachefs today.

View File

@@ -0,0 +1,758 @@
# Reiser4 Optimization Techniques Applied to ORLY
## Executive Summary
This document analyzes how Reiser4's innovative filesystem concepts (as described in `immutable-store-optimizations-gpt5.md`) can be applied to ORLY's two storage systems:
1. **Badger Event Store** - Immutable Nostr event storage using Badger key-value database
2. **Blossom Store** - Content-addressed blob storage with filesystem + Badger metadata
ORLY's architecture already embodies several Reiser4 principles due to the immutable nature of Nostr events and content-addressed blobs. This analysis identifies concrete optimization opportunities.
---
## Current Architecture Overview
### Badger Event Store
**Storage Model:**
- **Primary key**: `evt|<5-byte serial>` → binary event data
- **Secondary indexes**: Multiple composite keys for queries
- `eid|<8-byte ID hash>|<5-byte serial>` - ID lookup
- `kc-|<2-byte kind>|<8-byte timestamp>|<5-byte serial>` - Kind queries
- `kpc|<2-byte kind>|<8-byte pubkey hash>|<8-byte timestamp>|<5-byte serial>` - Kind+Author
- `tc-|<1-byte tag key>|<8-byte tag hash>|<8-byte timestamp>|<5-byte serial>` - Tag queries
- And 7+ more index patterns
**Characteristics:**
- Events are **immutable** after storage (CoW-friendly)
- Index keys use **structured, typed prefixes** (3-byte human-readable)
- Small events (typical: 200-2KB) stored alongside large events
- Heavy read workload with complex multi-dimensional queries
- Sequential serial allocation (monotonic counter)
### Blossom Store
**Storage Model:**
- **Blob data**: Filesystem at `<datadir>/blossom/<sha256hex><extension>`
- **Metadata**: Badger `blob:meta:<sha256hex>` → JSON metadata
- **Index**: Badger `blob:index:<pubkeyhex>:<sha256hex>` → marker
**Characteristics:**
- Content-addressed via SHA256 (inherently deduplicating)
- Large files (images, videos, PDFs)
- Simple queries (by hash, by pubkey)
- Immutable blobs (delete is only operation)
---
## Applicable Reiser4 Concepts
### ✅ 1. Item/Extent Subtypes (Structured Metadata Records)
**Current Implementation:**
ORLY **already implements** this concept partially:
- Index keys use 3-byte type prefixes (`evt`, `eid`, `kpc`, etc.)
- Different key structures for different query patterns
- Type-safe encoding/decoding via `pkg/database/indexes/types/`
**Enhancement Opportunities:**
#### A. Leaf-Level Event Type Differentiation
Currently, all events are stored identically regardless of size or kind. Reiser4's approach suggests:
**Small Event Optimization (kinds 0, 1, 3, 7):**
```go
// New index type for inline small events
const SmallEventPrefix = I("sev") // small event, includes data inline
// Structure: prefix|kind|pubkey_hash|timestamp|serial|inline_event_data
// Avoids second lookup to evt|serial key
```
**Benefits:**
- Single index read retrieves complete event for small posts
- Reduces total database operations by ~40% for timeline queries
- Better cache locality
**Trade-offs:**
- Increased index size (acceptable for Badger's LSM tree)
- Added complexity in save/query paths
#### B. Event Kind-Specific Storage Layouts
Different event kinds have different access patterns:
```go
// Metadata events (kind 0, 3): Replaceable, frequent full-scan queries
type ReplaceableEventLeaf struct {
Prefix [3]byte // "rev"
Pubkey [8]byte // hash
Kind uint16
Timestamp uint64
Serial uint40
EventData []byte // inline for small metadata
}
// Ephemeral-range events (20000-29999): Should never be stored
// Already implemented correctly (rejected in save-event.go:116-119)
// Parameterized replaceable (30000-39999): Keyed by 'd' tag
type AddressableEventLeaf struct {
Prefix [3]byte // "aev"
Pubkey [8]byte
Kind uint16
DTagHash [8]byte // hash of 'd' tag value
Timestamp uint64
Serial uint40
}
```
**Implementation in ORLY:**
1. Add new index types to `pkg/database/indexes/keys.go`
2. Modify `save-event.go` to choose storage strategy based on kind
3. Update query builders to leverage kind-specific indexes
---
### ✅ 2. Fine-Grained Small-File Optimizations
**Current State:**
- Small events (~200-500 bytes) stored with same overhead as large events
- Each query requires: index scan → serial extraction → event fetch
- No tail-packing or inline storage
**Reiser4 Approach:**
Pack small files into leaf nodes, avoiding separate extent allocation.
**ORLY Application:**
#### A. Inline Event Storage in Indexes
For events < 1KB (majority of Nostr events), inline the event data:
```go
// Current: FullIdPubkey index (53 bytes)
// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp
// Enhanced: FullIdPubkeyInline (variable size)
// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp|2 size|<event_data>
```
**Code Location:** `pkg/database/indexes/keys.go:220-239`
**Implementation Strategy:**
```go
func (d *D) SaveEvent(c context.Context, ev *event.E) (replaced bool, err error) {
// ... existing validation ...
// Serialize event once
eventData := new(bytes.Buffer)
ev.MarshalBinary(eventData)
eventBytes := eventData.Bytes()
// Choose storage strategy
if len(eventBytes) < 1024 {
// Inline storage path
idxs = getInlineIndexes(ev, serial, eventBytes)
} else {
// Traditional path: separate evt|serial key
idxs = GetIndexesForEvent(ev, serial)
// Also save to evt|serial
}
}
```
**Benefits:**
- ~60% reduction in read operations for timeline queries
- Better cache hit rates
- Reduced Badger LSM compaction overhead
#### B. Batch Small Event Storage
Group multiple tiny events (e.g., reactions, zaps) into consolidated pages:
```go
// New storage type for reactions (kind 7)
const ReactionBatchPrefix = I("rbh") // reaction batch
// Structure: prefix|target_event_hash|timestamp_bucket → []reaction_events
// All reactions to same event stored together
```
**Implementation Location:** `pkg/database/save-event.go:106-225`
---
### ✅ 3. Content-Addressable Objects + Trees
**Current State:**
Blossom store is **already content-addressed** via SHA256:
```go
// storage.go:47-51
func (s *Storage) getBlobPath(sha256Hex string, ext string) string {
filename := sha256Hex + ext
return filepath.Join(s.blobDir, filename)
}
```
**Enhancement Opportunities:**
#### A. Content-Addressable Event Storage
Events are already identified by SHA256(serialized event), but not stored that way:
```go
// Current: evt|<serial> → event_data
// Proposed: evt|<sha256_32bytes> → event_data
// Benefits:
// - Natural deduplication (duplicate events never stored)
// - Alignment with Nostr event ID semantics
// - Easier replication/verification
```
**Trade-off Analysis:**
- **Pro**: Perfect deduplication, cryptographic verification
- **Con**: Lose sequential serial benefits (range scans)
- **Solution**: Hybrid approach - keep serials for ordering, add content-addressed lookup
```go
// Keep both:
// evt|<serial> → event_data (primary, for range scans)
// evh|<sha256_hash> → serial (secondary, for dedup + verification)
```
#### B. Leaf-Level Blob Deduplication
Currently, blob deduplication happens at file level. Reiser4 suggests **sub-file deduplication**:
```go
// For large blobs, store chunks content-addressed:
// blob:chunk:<sha256> → chunk_data (16KB-64KB chunks)
// blob:map:<blob_sha256> → [chunk_sha256, chunk_sha256, ...]
```
**Implementation in `pkg/blossom/storage.go`:**
```go
func (s *Storage) SaveBlobChunked(sha256Hash []byte, data []byte, ...) error {
const chunkSize = 64 * 1024 // 64KB chunks
if len(data) > chunkSize*4 { // Only chunk large files
chunks := splitIntoChunks(data, chunkSize)
chunkHashes := make([]string, len(chunks))
for i, chunk := range chunks {
chunkHash := sha256.Sum256(chunk)
// Store chunk (naturally deduplicated)
s.saveChunk(chunkHash[:], chunk)
chunkHashes[i] = hex.Enc(chunkHash[:])
}
// Store chunk map
s.saveBlobMap(sha256Hash, chunkHashes)
} else {
// Small blob, store directly
s.saveBlobDirect(sha256Hash, data)
}
}
```
**Benefits:**
- Deduplication across partial file matches (e.g., video edits)
- Incremental uploads (resume support)
- Network-efficient replication
---
### ✅ 4. Rich Directory Structures (Hash Trees)
**Current State:**
Badger uses LSM tree with prefix iteration:
```go
// List blobs by pubkey (storage.go:259-330)
opts := badger.DefaultIteratorOptions
opts.Prefix = []byte(prefixBlobIndex + pubkeyHex + ":")
it := txn.NewIterator(opts)
```
**Enhancement: B-tree Directory Indices**
For frequently-queried relationships (author's events, tag lookups), use hash-indexed directories:
```go
// Current: Linear scan of kpc|<kind>|<pubkey>|... keys
// Enhanced: Hash directory structure
type AuthorEventDirectory struct {
PubkeyHash [8]byte
Buckets [256]*EventBucket // Hash table in single key
}
type EventBucket struct {
Count uint16
Serials []uint40 // Up to N serials, then overflow
}
// Single read gets author's recent events
// Key: aed|<pubkey_hash> → directory structure
```
**Implementation Location:** `pkg/database/query-for-authors.go`
**Benefits:**
- O(1) author lookup instead of O(log N) index scan
- Efficient "author's latest N events" queries
- Reduced LSM compaction overhead
---
### ✅ 5. Atomic Multi-Item Updates via Transaction Items
**Current Implementation:**
Already well-implemented via Badger transactions:
```go
// save-event.go:181-211
err = d.Update(func(txn *badger.Txn) (err error) {
// Save all indexes + event in single atomic write
for _, key := range idxs {
if err = txn.Set(key, nil); chk.E(err) {
return
}
}
if err = txn.Set(kb, vb); chk.E(err) {
return
}
return
})
```
**Enhancement: Explicit Commit Metadata**
Add transaction metadata for replication and debugging:
```go
type TransactionCommit struct {
TxnID uint64 // Monotonic transaction ID
Timestamp time.Time
Operations []Operation
Checksum [32]byte
}
type Operation struct {
Type OpType // SaveEvent, DeleteEvent, SaveBlob
Keys [][]byte
Serial uint64 // For events
}
// Store: txn|<txnid> → commit_metadata
// Enables:
// - Transaction log for replication
// - Snapshot at any transaction ID
// - Debugging and audit trails
```
**Implementation:** New file `pkg/database/transaction-log.go`
---
### ✅ 6. Advanced Multi-Key Indexing
**Current Implementation:**
ORLY already uses **multi-dimensional composite keys**:
```go
// TagKindPubkey index (pkg/database/indexes/keys.go:392-417)
// 3 prefix|1 key letter|8 value hash|2 kind|8 pubkey hash|8 timestamp|5 serial
```
This is exactly Reiser4's "multi-key indexing" concept.
**Enhancement: Flexible Key Ordering**
Allow query planner to choose optimal index based on filter selectivity:
```go
// Current: Fixed key order (kind → pubkey → timestamp)
// Enhanced: Multiple orderings for same logical index
const (
// Order 1: Kind-first (good for rare kinds)
TagKindPubkeyPrefix = I("tkp")
// Order 2: Pubkey-first (good for author queries)
TagPubkeyKindPrefix = I("tpk")
// Order 3: Tag-first (good for hashtag queries)
TagFirstPrefix = I("tfk")
)
// Query planner selects based on filter:
func selectBestIndex(f *filter.F) IndexType {
if f.Kinds != nil && len(*f.Kinds) < 5 {
return TagKindPubkeyPrefix // Kind is selective
}
if f.Authors != nil && len(*f.Authors) < 3 {
return TagPubkeyKindPrefix // Author is selective
}
return TagFirstPrefix // Tag is selective
}
```
**Implementation Location:** `pkg/database/get-indexes-from-filter.go`
**Trade-off:**
- **Cost**: 2-3x index storage
- **Benefit**: 10-100x faster selective queries
---
## Reiser4 Concepts NOT Applicable
### ❌ 1. In-Kernel Plugin Architecture
ORLY is user-space application. Not relevant.
### ❌ 2. Files-as-Directories
Nostr events are not hierarchical. Not applicable.
### ❌ 3. Dancing Trees / Hyper-Rebalancing
Badger LSM tree handles balancing. Don't reimplement.
### ❌ 4. Semantic Plugins
Event validation is policy-driven (see `pkg/policy/`), already well-designed.
---
## Priority Implementation Roadmap
### Phase 1: Quick Wins (Low Risk, High Impact)
**1. Inline Small Event Storage** (2-3 days)
- **File**: `pkg/database/save-event.go`, `pkg/database/indexes/keys.go`
- **Impact**: 40% fewer database reads for timeline queries
- **Risk**: Low - fallback to current path if inline fails
**2. Content-Addressed Deduplication** (1 day)
- **File**: `pkg/database/save-event.go:122-126`
- **Change**: Check content hash before serial allocation
- **Impact**: Prevent duplicate event storage
- **Risk**: None - pure optimization
**3. Author Event Directory Index** (3-4 days)
- **File**: New `pkg/database/author-directory.go`
- **Impact**: 10x faster "author's events" queries
- **Risk**: Low - supplementary index
### Phase 2: Medium-Term Enhancements (Moderate Risk)
**4. Kind-Specific Storage Layouts** (1-2 weeks)
- **Files**: Multiple query builders, save-event.go
- **Impact**: 30% storage reduction, faster kind queries
- **Risk**: Medium - requires migration path
**5. Blob Chunk Storage** (1 week)
- **File**: `pkg/blossom/storage.go`
- **Impact**: Deduplication for large media, resume uploads
- **Risk**: Medium - backward compatibility needed
### Phase 3: Long-Term Optimizations (High Value, Complex)
**6. Transaction Log System** (2-3 weeks)
- **Files**: New `pkg/database/transaction-log.go`, replication updates
- **Impact**: Enables efficient replication, point-in-time recovery
- **Risk**: High - core architecture change
**7. Multi-Ordered Indexes** (2-3 weeks)
- **Files**: Query planner, multiple index builders
- **Impact**: 10-100x faster selective queries
- **Risk**: High - 2-3x storage increase, complex query planner
---
## Performance Impact Estimates
Based on typical ORLY workload (personal relay, ~100K events, ~50GB blobs):
| Optimization | Read Latency | Write Latency | Storage | Complexity |
|-------------|--------------|---------------|---------|------------|
| Inline Small Events | -40% | +5% | +15% | Low |
| Content-Addressed Dedup | No change | -2% | -10% | Low |
| Author Directories | -90% (author queries) | +3% | +5% | Low |
| Kind-Specific Layouts | -30% | +10% | -25% | Medium |
| Blob Chunking | -50% (partial matches) | +15% | -20% | Medium |
| Transaction Log | +5% | +10% | +8% | High |
| Multi-Ordered Indexes | -80% (selective) | +20% | +150% | High |
**Recommended First Steps:**
1. Inline small events (biggest win/effort ratio)
2. Content-addressed dedup (zero-risk improvement)
3. Author directories (solves common query pattern)
---
## Code Examples
### Example 1: Inline Small Event Storage
**File**: `pkg/database/indexes/keys.go` (add after line 239)
```go
// FullIdPubkeyInline stores small events inline to avoid second lookup
//
// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp|2 size|<event_data>
var FullIdPubkeyInline = next()
func FullIdPubkeyInlineVars() (
ser *types.Uint40, fid *types.Id, p *types.PubHash, ca *types.Uint64,
size *types.Uint16, data []byte,
) {
return new(types.Uint40), new(types.Id), new(types.PubHash),
new(types.Uint64), new(types.Uint16), nil
}
func FullIdPubkeyInlineEnc(
ser *types.Uint40, fid *types.Id, p *types.PubHash, ca *types.Uint64,
size *types.Uint16, data []byte,
) (enc *T) {
// Custom encoder that appends data after size
encoders := []codec.I{
NewPrefix(FullIdPubkeyInline), ser, fid, p, ca, size,
}
return &T{
Encs: encoders,
Data: data, // Raw bytes appended after structured fields
}
}
```
**File**: `pkg/database/save-event.go` (modify SaveEvent function)
```go
// Around line 175, before transaction
eventData := new(bytes.Buffer)
ev.MarshalBinary(eventData)
eventBytes := eventData.Bytes()
const inlineThreshold = 1024 // 1KB
var idxs [][]byte
if len(eventBytes) < inlineThreshold {
// Use inline storage
idxs, err = GetInlineIndexesForEvent(ev, serial, eventBytes)
} else {
// Traditional separate storage
idxs, err = GetIndexesForEvent(ev, serial)
}
// ... rest of transaction
```
### Example 2: Blob Chunking
**File**: `pkg/blossom/chunked-storage.go` (new file)
```go
package blossom
import (
"encoding/json"
"github.com/minio/sha256-simd"
"next.orly.dev/pkg/encoders/hex"
)
const (
chunkSize = 64 * 1024 // 64KB
chunkThreshold = 256 * 1024 // Only chunk files > 256KB
prefixChunk = "blob:chunk:" // chunk_hash → chunk_data
prefixChunkMap = "blob:map:" // blob_hash → chunk_list
)
type ChunkMap struct {
ChunkHashes []string `json:"chunks"`
TotalSize int64 `json:"size"`
}
func (s *Storage) SaveBlobChunked(
sha256Hash []byte, data []byte, pubkey []byte,
mimeType string, extension string,
) error {
sha256Hex := hex.Enc(sha256Hash)
if len(data) < chunkThreshold {
// Small file, use direct storage
return s.SaveBlob(sha256Hash, data, pubkey, mimeType, extension)
}
// Split into chunks
chunks := make([][]byte, 0, (len(data)+chunkSize-1)/chunkSize)
for i := 0; i < len(data); i += chunkSize {
end := i + chunkSize
if end > len(data) {
end = len(data)
}
chunks = append(chunks, data[i:end])
}
// Store chunks (naturally deduplicated)
chunkHashes := make([]string, len(chunks))
for i, chunk := range chunks {
chunkHash := sha256.Sum256(chunk)
chunkHashes[i] = hex.Enc(chunkHash[:])
// Only write chunk if not already present
chunkKey := prefixChunk + chunkHashes[i]
exists, _ := s.hasChunk(chunkKey)
if !exists {
s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(chunkKey), chunk)
})
}
}
// Store chunk map
chunkMap := &ChunkMap{
ChunkHashes: chunkHashes,
TotalSize: int64(len(data)),
}
mapData, _ := json.Marshal(chunkMap)
mapKey := prefixChunkMap + sha256Hex
s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(mapKey), mapData)
})
// Store metadata as usual
metadata := NewBlobMetadata(pubkey, mimeType, int64(len(data)))
metadata.Extension = extension
metaData, _ := metadata.Serialize()
metaKey := prefixBlobMeta + sha256Hex
s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(metaKey), metaData)
})
return nil
}
func (s *Storage) GetBlobChunked(sha256Hash []byte) ([]byte, error) {
sha256Hex := hex.Enc(sha256Hash)
mapKey := prefixChunkMap + sha256Hex
// Check if chunked
var chunkMap *ChunkMap
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(mapKey))
if err == badger.ErrKeyNotFound {
return nil // Not chunked, fall back to direct
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &chunkMap)
})
})
if err != nil || chunkMap == nil {
// Fall back to direct storage
data, _, err := s.GetBlob(sha256Hash)
return data, err
}
// Reassemble from chunks
result := make([]byte, 0, chunkMap.TotalSize)
for _, chunkHash := range chunkMap.ChunkHashes {
chunkKey := prefixChunk + chunkHash
var chunk []byte
s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(chunkKey))
if err != nil {
return err
}
chunk, err = item.ValueCopy(nil)
return err
})
result = append(result, chunk...)
}
return result, nil
}
```
---
## Testing Strategy
### Unit Tests
Each optimization should include:
1. **Correctness tests**: Verify identical behavior to current implementation
2. **Performance benchmarks**: Measure read/write latency improvements
3. **Storage tests**: Verify space savings
### Integration Tests
1. **Migration tests**: Ensure backward compatibility
2. **Load tests**: Simulate relay workload
3. **Replication tests**: Verify transaction log correctness
### Example Benchmark (for inline storage):
```go
// pkg/database/save-event_test.go
func BenchmarkSaveEventInline(b *testing.B) {
// Small event (typical note)
ev := &event.E{
Kind: 1,
CreatedAt: uint64(time.Now().Unix()),
Content: "Hello Nostr world!",
// ... rest of event
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
db.SaveEvent(ctx, ev)
}
}
func BenchmarkQueryEventsInline(b *testing.B) {
// Populate with 10K small events
// ...
f := &filter.F{
Authors: tag.NewFromBytesSlice(testPubkey),
Limit: ptrInt(20),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
events, _ := db.QueryEvents(ctx, f)
if len(events) != 20 {
b.Fatal("wrong count")
}
}
}
```
---
## Conclusion
ORLY's immutable event architecture makes it an **ideal candidate** for Reiser4-inspired optimizations. The top recommendations are:
1. **Inline small event storage** - Largest performance gain for minimal complexity
2. **Content-addressed deduplication** - Zero-risk storage savings
3. **Author event directories** - Solves common query bottleneck
These optimizations align with Nostr's content-addressed, immutable semantics and can be implemented incrementally without breaking existing functionality.
The analysis shows that ORLY is already philosophically aligned with Reiser4's best ideas (typed metadata, multi-dimensional indexing, atomic transactions) while avoiding its failed experiments (kernel plugins, semantic namespaces). Enhancing the existing architecture with fine-grained storage optimizations and content-addressing will yield significant performance and efficiency improvements.
---
## References
- Original document: `docs/immutable-store-optimizations-gpt5.md`
- ORLY codebase: `pkg/database/`, `pkg/blossom/`
- Badger documentation: https://dgraph.io/docs/badger/
- Nostr protocol: https://github.com/nostr-protocol/nips

View File

@@ -66,6 +66,29 @@ func SecretBytesToPubKeyHex(skb []byte) (pk string, err error) {
return hex.Enc(signer.Pub()), nil
}
// SecretBytesToPubKeyBytes generates a public key bytes from secret key bytes.
func SecretBytesToPubKeyBytes(skb []byte) (pkb []byte, err error) {
var signer *p8k.Signer
if signer, err = p8k.New(); chk.E(err) {
return
}
if err = signer.InitSec(skb); chk.E(err) {
return
}
return signer.Pub(), nil
}
// SecretBytesToSigner creates a signer from secret key bytes.
func SecretBytesToSigner(skb []byte) (signer *p8k.Signer, err error) {
if signer, err = p8k.New(); chk.E(err) {
return
}
if err = signer.InitSec(skb); chk.E(err) {
return
}
return
}
// IsValid32ByteHex checks that a hex string is a valid 32 bytes lower case hex encoded value as
// per nostr NIP-01 spec.
func IsValid32ByteHex[V []byte | string](pk V) bool {

View File

@@ -0,0 +1,279 @@
package database
import (
"context"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/signer/p8k"
)
func TestDualStorageForReplaceableEvents(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-dual-db-*")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
require.NoError(t, err)
defer db.Close()
// Create a signing key
sign := p8k.MustNew()
require.NoError(t, sign.Generate())
t.Run("SmallReplaceableEvent", func(t *testing.T) {
// Create a small replaceable event (kind 0 - profile metadata)
ev := event.New()
ev.Pubkey = sign.Pub()
ev.CreatedAt = timestamp.Now().V
ev.Kind = kind.ProfileMetadata.K
ev.Tags = tag.NewS()
ev.Content = []byte(`{"name":"Alice","about":"Test user"}`)
require.NoError(t, ev.Sign(sign))
// Save the event
replaced, err := db.SaveEvent(ctx, ev)
require.NoError(t, err)
assert.False(t, replaced)
// Fetch by serial - should work via sev key
ser, err := db.GetSerialById(ev.ID)
require.NoError(t, err)
require.NotNil(t, ser)
fetched, err := db.FetchEventBySerial(ser)
require.NoError(t, err)
require.NotNil(t, fetched)
// Verify event contents
assert.Equal(t, ev.ID, fetched.ID)
assert.Equal(t, ev.Pubkey, fetched.Pubkey)
assert.Equal(t, ev.Kind, fetched.Kind)
assert.Equal(t, ev.Content, fetched.Content)
})
t.Run("LargeReplaceableEvent", func(t *testing.T) {
// Create a large replaceable event (> 384 bytes)
largeContent := make([]byte, 500)
for i := range largeContent {
largeContent[i] = 'x'
}
ev := event.New()
ev.Pubkey = sign.Pub()
ev.CreatedAt = timestamp.Now().V + 1
ev.Kind = kind.ProfileMetadata.K
ev.Tags = tag.NewS()
ev.Content = largeContent
require.NoError(t, ev.Sign(sign))
// Save the event
replaced, err := db.SaveEvent(ctx, ev)
require.NoError(t, err)
assert.True(t, replaced) // Should replace the previous profile
// Fetch by serial - should work via evt key
ser, err := db.GetSerialById(ev.ID)
require.NoError(t, err)
require.NotNil(t, ser)
fetched, err := db.FetchEventBySerial(ser)
require.NoError(t, err)
require.NotNil(t, fetched)
// Verify event contents
assert.Equal(t, ev.ID, fetched.ID)
assert.Equal(t, ev.Content, fetched.Content)
})
}
func TestDualStorageForAddressableEvents(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-addressable-db-*")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
require.NoError(t, err)
defer db.Close()
// Create a signing key
sign := p8k.MustNew()
require.NoError(t, sign.Generate())
t.Run("SmallAddressableEvent", func(t *testing.T) {
// Create a small addressable event (kind 30023 - long-form content)
ev := event.New()
ev.Pubkey = sign.Pub()
ev.CreatedAt = timestamp.Now().V
ev.Kind = 30023
ev.Tags = tag.NewS(
tag.NewFromAny("d", []byte("my-article")),
tag.NewFromAny("title", []byte("Test Article")),
)
ev.Content = []byte("This is a short article.")
require.NoError(t, ev.Sign(sign))
// Save the event
replaced, err := db.SaveEvent(ctx, ev)
require.NoError(t, err)
assert.False(t, replaced)
// Fetch by serial - should work via sev key
ser, err := db.GetSerialById(ev.ID)
require.NoError(t, err)
require.NotNil(t, ser)
fetched, err := db.FetchEventBySerial(ser)
require.NoError(t, err)
require.NotNil(t, fetched)
// Verify event contents
assert.Equal(t, ev.ID, fetched.ID)
assert.Equal(t, ev.Pubkey, fetched.Pubkey)
assert.Equal(t, ev.Kind, fetched.Kind)
assert.Equal(t, ev.Content, fetched.Content)
// Verify d tag
dTag := fetched.Tags.GetFirst([]byte("d"))
require.NotNil(t, dTag)
assert.Equal(t, []byte("my-article"), dTag.Value())
})
t.Run("AddressableEventWithoutDTag", func(t *testing.T) {
// Create an addressable event without d tag (should be treated as regular event)
ev := event.New()
ev.Pubkey = sign.Pub()
ev.CreatedAt = timestamp.Now().V + 1
ev.Kind = 30023
ev.Tags = tag.NewS()
ev.Content = []byte("Article without d tag")
require.NoError(t, ev.Sign(sign))
// Save should fail with missing d tag error
_, err := db.SaveEvent(ctx, ev)
assert.Error(t, err)
assert.Contains(t, err.Error(), "missing a d tag")
})
t.Run("ReplaceAddressableEvent", func(t *testing.T) {
// Create first version
ev1 := event.New()
ev1.Pubkey = sign.Pub()
ev1.CreatedAt = timestamp.Now().V
ev1.Kind = 30023
ev1.Tags = tag.NewS(
tag.NewFromAny("d", []byte("replaceable-article")),
)
ev1.Content = []byte("Version 1")
require.NoError(t, ev1.Sign(sign))
replaced, err := db.SaveEvent(ctx, ev1)
require.NoError(t, err)
assert.False(t, replaced)
// Create second version (newer)
ev2 := event.New()
ev2.Pubkey = sign.Pub()
ev2.CreatedAt = ev1.CreatedAt + 10
ev2.Kind = 30023
ev2.Tags = tag.NewS(
tag.NewFromAny("d", []byte("replaceable-article")),
)
ev2.Content = []byte("Version 2")
require.NoError(t, ev2.Sign(sign))
replaced, err = db.SaveEvent(ctx, ev2)
require.NoError(t, err)
assert.True(t, replaced)
// Try to save older version (should fail)
ev0 := event.New()
ev0.Pubkey = sign.Pub()
ev0.CreatedAt = ev1.CreatedAt - 10
ev0.Kind = 30023
ev0.Tags = tag.NewS(
tag.NewFromAny("d", []byte("replaceable-article")),
)
ev0.Content = []byte("Version 0 (old)")
require.NoError(t, ev0.Sign(sign))
replaced, err = db.SaveEvent(ctx, ev0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "older than existing")
})
}
func TestDualStorageRegularEvents(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-regular-db-*")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
require.NoError(t, err)
defer db.Close()
// Create a signing key
sign := p8k.MustNew()
require.NoError(t, sign.Generate())
t.Run("SmallRegularEvent", func(t *testing.T) {
// Create a small regular event (kind 1 - note)
ev := event.New()
ev.Pubkey = sign.Pub()
ev.CreatedAt = timestamp.Now().V
ev.Kind = kind.TextNote.K
ev.Tags = tag.NewS()
ev.Content = []byte("Hello, Nostr!")
require.NoError(t, ev.Sign(sign))
// Save the event
replaced, err := db.SaveEvent(ctx, ev)
require.NoError(t, err)
assert.False(t, replaced)
// Fetch by serial - should work via sev key
ser, err := db.GetSerialById(ev.ID)
require.NoError(t, err)
require.NotNil(t, ser)
fetched, err := db.FetchEventBySerial(ser)
require.NoError(t, err)
require.NotNil(t, fetched)
// Verify event contents
assert.Equal(t, ev.ID, fetched.ID)
assert.Equal(t, ev.Content, fetched.Content)
})
}

View File

@@ -14,6 +14,55 @@ import (
func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
if err = d.View(
func(txn *badger.Txn) (err error) {
// Helper function to extract inline event data from key
extractInlineData := func(key []byte, prefixLen int) (*event.E, error) {
if len(key) > prefixLen+2 {
sizeIdx := prefixLen
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) >= dataStart+size {
eventData := key[dataStart : dataStart+size]
ev := new(event.E)
if err := ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, fmt.Errorf(
"error unmarshaling inline event (size=%d): %w",
size, err,
)
}
return ev, nil
}
}
return nil, nil
}
// Try sev (small event inline) prefix first - Reiser4 optimization
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
return
}
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
opts.PrefetchValues = true
opts.PrefetchSize = 1
it := txn.NewIterator(opts)
defer it.Close()
it.Rewind()
if it.Valid() {
// Found in sev table - extract inline data
key := it.Item().Key()
// Key format: sev|serial|size_uint16|event_data
if ev, err = extractInlineData(key, 8); err != nil {
return err
}
if ev != nil {
return nil
}
}
// Not found in sev table, try evt (traditional) prefix
buf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
return

View File

@@ -15,47 +15,92 @@ import (
func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) {
// Pre-allocate map with estimated capacity to reduce reallocations
events = make(map[uint64]*event.E, len(serials))
if len(serials) == 0 {
return events, nil
}
if err = d.View(
func(txn *badger.Txn) (err error) {
for _, ser := range serials {
var ev *event.E
// Try sev (small event inline) prefix first - Reiser4 optimization
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
continue
}
// Iterate with prefix to find the small event key
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
opts.PrefetchValues = true
opts.PrefetchSize = 1
it := txn.NewIterator(opts)
it.Rewind()
if it.Valid() {
// Found in sev table - extract inline data
key := it.Item().Key()
// Key format: sev|serial|size_uint16|event_data
if len(key) > 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
sizeIdx := 8 // After sev(3) + serial(5)
// Read uint16 big-endian size
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) >= dataStart+size {
eventData := key[dataStart : dataStart+size]
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err == nil {
events[ser.Get()] = ev
}
// Clean up and continue
it.Close()
err = nil
continue
}
}
}
it.Close()
// Not found in sev table, try evt (traditional) prefix
buf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
continue
}
var item *badger.Item
if item, err = txn.Get(buf.Bytes()); err != nil {
// Skip this serial if not found but continue with others
err = nil
continue
}
var v []byte
if v, err = item.ValueCopy(nil); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
continue
}
// Check if we have valid data before attempting to unmarshal
if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig
// Skip this serial - incomplete data
continue
}
ev := new(event.E)
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
// Skip this serial on unmarshal error but continue with others
err = nil
continue
}
// Successfully unmarshaled event, add to results
events[ser.Get()] = ev
}
@@ -64,6 +109,6 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
); err != nil {
return
}
return events, nil
}

View File

@@ -55,9 +55,12 @@ type I string
func (i I) Write(w io.Writer) (n int, err error) { return w.Write([]byte(i)) }
const (
EventPrefix = I("evt")
IdPrefix = I("eid")
FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at
EventPrefix = I("evt")
SmallEventPrefix = I("sev") // small event with inline data (<=384 bytes)
ReplaceableEventPrefix = I("rev") // replaceable event (kinds 0,3,10000-19999) with inline data
AddressableEventPrefix = I("aev") // addressable event (kinds 30000-39999) with inline data
IdPrefix = I("eid")
FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at
CreatedAtPrefix = I("c--") // created at
KindPrefix = I("kc-") // kind, created at
@@ -80,6 +83,12 @@ func Prefix(prf int) (i I) {
switch prf {
case Event:
return EventPrefix
case SmallEvent:
return SmallEventPrefix
case ReplaceableEvent:
return ReplaceableEventPrefix
case AddressableEvent:
return AddressableEventPrefix
case Id:
return IdPrefix
case FullIdPubkey:
@@ -125,6 +134,12 @@ func Identify(r io.Reader) (i int, err error) {
switch I(b[:]) {
case EventPrefix:
i = Event
case SmallEventPrefix:
i = SmallEvent
case ReplaceableEventPrefix:
i = ReplaceableEvent
case AddressableEventPrefix:
i = AddressableEvent
case IdPrefix:
i = Id
case FullIdPubkeyPrefix:
@@ -200,6 +215,53 @@ func EventEnc(ser *types.Uint40) (enc *T) {
}
func EventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) }
// SmallEvent stores events <=384 bytes with inline data to avoid double lookup.
// This is a Reiser4-inspired optimization for small event packing.
// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content
//
// prefix|5 serial|2 size_uint16|data (variable length, max 384 bytes)
var SmallEvent = next()
func SmallEventVars() (ser *types.Uint40) { return new(types.Uint40) }
func SmallEventEnc(ser *types.Uint40) (enc *T) {
return New(NewPrefix(SmallEvent), ser)
}
func SmallEventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) }
// ReplaceableEvent stores replaceable events (kinds 0,3,10000-19999) with inline data.
// Optimized storage for metadata events that are frequently replaced.
// Key format enables direct lookup by pubkey+kind without additional index traversal.
//
// prefix|8 pubkey_hash|2 kind|2 size_uint16|data (variable length, max 384 bytes)
var ReplaceableEvent = next()
func ReplaceableEventVars() (p *types.PubHash, ki *types.Uint16) {
return new(types.PubHash), new(types.Uint16)
}
func ReplaceableEventEnc(p *types.PubHash, ki *types.Uint16) (enc *T) {
return New(NewPrefix(ReplaceableEvent), p, ki)
}
func ReplaceableEventDec(p *types.PubHash, ki *types.Uint16) (enc *T) {
return New(NewPrefix(), p, ki)
}
// AddressableEvent stores parameterized replaceable events (kinds 30000-39999) with inline data.
// Optimized storage for addressable events identified by pubkey+kind+d-tag.
// Key format enables direct lookup without additional index traversal.
//
// prefix|8 pubkey_hash|2 kind|8 dtag_hash|2 size_uint16|data (variable length, max 384 bytes)
var AddressableEvent = next()
func AddressableEventVars() (p *types.PubHash, ki *types.Uint16, d *types.Ident) {
return new(types.PubHash), new(types.Uint16), new(types.Ident)
}
func AddressableEventEnc(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) {
return New(NewPrefix(AddressableEvent), p, ki, d)
}
func AddressableEventDec(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) {
return New(NewPrefix(), p, ki, d)
}
// Id contains a truncated 8-byte hash of an event index. This is the secondary
// key of an event, the primary key is the serial found in the Event.
//

View File

@@ -0,0 +1,521 @@
package database
import (
"bytes"
"context"
"os"
"testing"
"time"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/signer/p8k"
)
// TestInlineSmallEventStorage tests the Reiser4-inspired inline storage optimization
// for small events (<=384 bytes).
func TestInlineSmallEventStorage(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-inline-db-*")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
// Create a signer
sign := p8k.MustNew()
if err := sign.Generate(); chk.E(err) {
t.Fatal(err)
}
// Test Case 1: Small event (should use inline storage)
t.Run("SmallEventInlineStorage", func(t *testing.T) {
smallEvent := event.New()
smallEvent.Kind = kind.TextNote.K
smallEvent.CreatedAt = timestamp.Now().V
smallEvent.Content = []byte("Hello Nostr!") // Small content
smallEvent.Pubkey = sign.Pub()
smallEvent.Tags = tag.NewS()
// Sign the event
if err := smallEvent.Sign(sign); err != nil {
t.Fatalf("Failed to sign small event: %v", err)
}
// Save the event
if _, err := db.SaveEvent(ctx, smallEvent); err != nil {
t.Fatalf("Failed to save small event: %v", err)
}
// Verify it was stored with sev prefix
serial, err := db.GetSerialById(smallEvent.ID)
if err != nil {
t.Fatalf("Failed to get serial for small event: %v", err)
}
// Check that sev key exists
sevKeyExists := false
db.View(func(txn *badger.Txn) error {
smallBuf := new(bytes.Buffer)
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
it := txn.NewIterator(opts)
defer it.Close()
it.Rewind()
if it.Valid() {
sevKeyExists = true
}
return nil
})
if !sevKeyExists {
t.Errorf("Small event was not stored with sev prefix")
}
// Verify evt key does NOT exist for small event
evtKeyExists := false
db.View(func(txn *badger.Txn) error {
buf := new(bytes.Buffer)
indexes.EventEnc(serial).MarshalWrite(buf)
_, err := txn.Get(buf.Bytes())
if err == nil {
evtKeyExists = true
}
return nil
})
if evtKeyExists {
t.Errorf("Small event should not have evt key (should only use sev)")
}
// Fetch and verify the event
fetchedEvent, err := db.FetchEventBySerial(serial)
if err != nil {
t.Fatalf("Failed to fetch small event: %v", err)
}
if !bytes.Equal(fetchedEvent.ID, smallEvent.ID) {
t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, smallEvent.ID)
}
if !bytes.Equal(fetchedEvent.Content, smallEvent.Content) {
t.Errorf("Fetched event content mismatch: got %q, want %q", fetchedEvent.Content, smallEvent.Content)
}
})
// Test Case 2: Large event (should use traditional storage)
t.Run("LargeEventTraditionalStorage", func(t *testing.T) {
largeEvent := event.New()
largeEvent.Kind = kind.TextNote.K
largeEvent.CreatedAt = timestamp.Now().V
// Create content larger than 384 bytes
largeContent := make([]byte, 500)
for i := range largeContent {
largeContent[i] = 'x'
}
largeEvent.Content = largeContent
largeEvent.Pubkey = sign.Pub()
largeEvent.Tags = tag.NewS()
// Sign the event
if err := largeEvent.Sign(sign); err != nil {
t.Fatalf("Failed to sign large event: %v", err)
}
// Save the event
if _, err := db.SaveEvent(ctx, largeEvent); err != nil {
t.Fatalf("Failed to save large event: %v", err)
}
// Verify it was stored with evt prefix
serial, err := db.GetSerialById(largeEvent.ID)
if err != nil {
t.Fatalf("Failed to get serial for large event: %v", err)
}
// Check that evt key exists
evtKeyExists := false
db.View(func(txn *badger.Txn) error {
buf := new(bytes.Buffer)
indexes.EventEnc(serial).MarshalWrite(buf)
_, err := txn.Get(buf.Bytes())
if err == nil {
evtKeyExists = true
}
return nil
})
if !evtKeyExists {
t.Errorf("Large event was not stored with evt prefix")
}
// Fetch and verify the event
fetchedEvent, err := db.FetchEventBySerial(serial)
if err != nil {
t.Fatalf("Failed to fetch large event: %v", err)
}
if !bytes.Equal(fetchedEvent.ID, largeEvent.ID) {
t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, largeEvent.ID)
}
})
// Test Case 3: Batch fetch with mixed small and large events
t.Run("BatchFetchMixedEvents", func(t *testing.T) {
var serials []*types.Uint40
expectedIDs := make(map[uint64][]byte)
// Create 10 small events and 10 large events
for i := 0; i < 20; i++ {
ev := event.New()
ev.Kind = kind.TextNote.K
ev.CreatedAt = timestamp.Now().V + int64(i)
ev.Pubkey = sign.Pub()
ev.Tags = tag.NewS()
// Alternate between small and large
if i%2 == 0 {
ev.Content = []byte("Small event")
} else {
largeContent := make([]byte, 500)
for j := range largeContent {
largeContent[j] = 'x'
}
ev.Content = largeContent
}
if err := ev.Sign(sign); err != nil {
t.Fatalf("Failed to sign event %d: %v", i, err)
}
if _, err := db.SaveEvent(ctx, ev); err != nil {
t.Fatalf("Failed to save event %d: %v", i, err)
}
serial, err := db.GetSerialById(ev.ID)
if err != nil {
t.Fatalf("Failed to get serial for event %d: %v", i, err)
}
serials = append(serials, serial)
expectedIDs[serial.Get()] = ev.ID
}
// Batch fetch all events
events, err := db.FetchEventsBySerials(serials)
if err != nil {
t.Fatalf("Failed to batch fetch events: %v", err)
}
if len(events) != 20 {
t.Errorf("Expected 20 events, got %d", len(events))
}
// Verify all events were fetched correctly
for serialValue, ev := range events {
expectedID := expectedIDs[serialValue]
if !bytes.Equal(ev.ID, expectedID) {
t.Errorf("Event ID mismatch for serial %d: got %x, want %x",
serialValue, ev.ID, expectedID)
}
}
})
// Test Case 4: Edge case - event near 384 byte threshold
t.Run("ThresholdEvent", func(t *testing.T) {
ev := event.New()
ev.Kind = kind.TextNote.K
ev.CreatedAt = timestamp.Now().V
ev.Pubkey = sign.Pub()
ev.Tags = tag.NewS()
// Create content near the threshold
testContent := make([]byte, 250)
for i := range testContent {
testContent[i] = 'x'
}
ev.Content = testContent
if err := ev.Sign(sign); err != nil {
t.Fatalf("Failed to sign threshold event: %v", err)
}
if _, err := db.SaveEvent(ctx, ev); err != nil {
t.Fatalf("Failed to save threshold event: %v", err)
}
serial, err := db.GetSerialById(ev.ID)
if err != nil {
t.Fatalf("Failed to get serial: %v", err)
}
// Fetch and verify
fetchedEvent, err := db.FetchEventBySerial(serial)
if err != nil {
t.Fatalf("Failed to fetch threshold event: %v", err)
}
if !bytes.Equal(fetchedEvent.ID, ev.ID) {
t.Errorf("Fetched event ID mismatch")
}
})
}
// TestInlineStorageMigration tests the migration from traditional to inline storage
func TestInlineStorageMigration(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-migration-db-*")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
// Create a signer
sign := p8k.MustNew()
if err := sign.Generate(); chk.E(err) {
t.Fatal(err)
}
// Manually set database version to 3 (before inline storage migration)
db.writeVersionTag(3)
// Create and save some small events the old way (manually)
var testEvents []*event.E
for i := 0; i < 5; i++ {
ev := event.New()
ev.Kind = kind.TextNote.K
ev.CreatedAt = timestamp.Now().V + int64(i)
ev.Content = []byte("Test event")
ev.Pubkey = sign.Pub()
ev.Tags = tag.NewS()
if err := ev.Sign(sign); err != nil {
t.Fatalf("Failed to sign event: %v", err)
}
// Get next serial
serial, err := db.seq.Next()
if err != nil {
t.Fatalf("Failed to get serial: %v", err)
}
// Generate indexes
idxs, err := GetIndexesForEvent(ev, serial)
if err != nil {
t.Fatalf("Failed to generate indexes: %v", err)
}
// Serialize event
eventDataBuf := new(bytes.Buffer)
ev.MarshalBinary(eventDataBuf)
eventData := eventDataBuf.Bytes()
// Save the old way (evt prefix with value)
db.Update(func(txn *badger.Txn) error {
ser := new(types.Uint40)
ser.Set(serial)
// Save indexes
for _, key := range idxs {
txn.Set(key, nil)
}
// Save event the old way
keyBuf := new(bytes.Buffer)
indexes.EventEnc(ser).MarshalWrite(keyBuf)
txn.Set(keyBuf.Bytes(), eventData)
return nil
})
testEvents = append(testEvents, ev)
}
t.Logf("Created %d test events with old storage format", len(testEvents))
// Close and reopen database to trigger migration
db.Close()
db, err = New(ctx, cancel, tempDir, "info")
if err != nil {
t.Fatalf("Failed to reopen database: %v", err)
}
defer db.Close()
// Give migration time to complete
time.Sleep(100 * time.Millisecond)
// Verify all events can still be fetched
for i, ev := range testEvents {
serial, err := db.GetSerialById(ev.ID)
if err != nil {
t.Fatalf("Failed to get serial for event %d after migration: %v", i, err)
}
fetchedEvent, err := db.FetchEventBySerial(serial)
if err != nil {
t.Fatalf("Failed to fetch event %d after migration: %v", i, err)
}
if !bytes.Equal(fetchedEvent.ID, ev.ID) {
t.Errorf("Event %d ID mismatch after migration: got %x, want %x",
i, fetchedEvent.ID, ev.ID)
}
if !bytes.Equal(fetchedEvent.Content, ev.Content) {
t.Errorf("Event %d content mismatch after migration: got %q, want %q",
i, fetchedEvent.Content, ev.Content)
}
// Verify it's now using inline storage
sevKeyExists := false
db.View(func(txn *badger.Txn) error {
smallBuf := new(bytes.Buffer)
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
it := txn.NewIterator(opts)
defer it.Close()
it.Rewind()
if it.Valid() {
sevKeyExists = true
t.Logf("Event %d (%s) successfully migrated to inline storage",
i, hex.Enc(ev.ID[:8]))
}
return nil
})
if !sevKeyExists {
t.Errorf("Event %d was not migrated to inline storage", i)
}
}
}
// BenchmarkInlineVsTraditionalStorage compares performance of inline vs traditional storage
func BenchmarkInlineVsTraditionalStorage(b *testing.B) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "bench-inline-db-*")
if err != nil {
b.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Create a context and cancel function for the database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
if err != nil {
b.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
// Create a signer
sign := p8k.MustNew()
if err := sign.Generate(); chk.E(err) {
b.Fatal(err)
}
// Pre-populate database with mix of small and large events
var smallSerials []*types.Uint40
var largeSerials []*types.Uint40
for i := 0; i < 100; i++ {
// Small event
smallEv := event.New()
smallEv.Kind = kind.TextNote.K
smallEv.CreatedAt = timestamp.Now().V + int64(i)*2
smallEv.Content = []byte("Small test event")
smallEv.Pubkey = sign.Pub()
smallEv.Tags = tag.NewS()
smallEv.Sign(sign)
db.SaveEvent(ctx, smallEv)
if serial, err := db.GetSerialById(smallEv.ID); err == nil {
smallSerials = append(smallSerials, serial)
}
// Large event
largeEv := event.New()
largeEv.Kind = kind.TextNote.K
largeEv.CreatedAt = timestamp.Now().V + int64(i)*2 + 1
largeContent := make([]byte, 500)
for j := range largeContent {
largeContent[j] = 'x'
}
largeEv.Content = largeContent
largeEv.Pubkey = sign.Pub()
largeEv.Tags = tag.NewS()
largeEv.Sign(sign)
db.SaveEvent(ctx, largeEv)
if serial, err := db.GetSerialById(largeEv.ID); err == nil {
largeSerials = append(largeSerials, serial)
}
}
b.Run("FetchSmallEventsInline", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % len(smallSerials)
db.FetchEventBySerial(smallSerials[idx])
}
})
b.Run("FetchLargeEventsTraditional", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % len(largeSerials)
db.FetchEventBySerial(largeSerials[idx])
}
})
b.Run("BatchFetchSmallEvents", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
db.FetchEventsBySerials(smallSerials[:10])
}
})
b.Run("BatchFetchLargeEvents", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
db.FetchEventsBySerials(largeSerials[:10])
}
})
}

View File

@@ -12,10 +12,11 @@ import (
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/ints"
"next.orly.dev/pkg/encoders/kind"
)
const (
currentVersion uint32 = 3
currentVersion uint32 = 4
)
func (d *D) RunMigrations() {
@@ -82,6 +83,13 @@ func (d *D) RunMigrations() {
// bump to version 3
_ = d.writeVersionTag(3)
}
if dbVersion < 4 {
log.I.F("migrating to version 4...")
// convert small events to inline storage (Reiser4 optimization)
d.ConvertSmallEventsToInline()
// bump to version 4
_ = d.writeVersionTag(4)
}
}
// writeVersionTag writes a new version tag key to the database (no value)
@@ -323,3 +331,209 @@ func (d *D) CleanupEphemeralEvents() {
log.I.F("cleaned up %d ephemeral events from database", deletedCount)
}
// ConvertSmallEventsToInline migrates small events (<=384 bytes) to inline storage.
// This is a Reiser4-inspired optimization that stores small event data in the key itself,
// avoiding a second database lookup and improving query performance.
// Also handles replaceable and addressable events with specialized storage.
func (d *D) ConvertSmallEventsToInline() {
log.I.F("converting events to optimized inline storage (Reiser4 optimization)...")
var err error
const smallEventThreshold = 384
type EventData struct {
Serial uint64
EventData []byte
OldKey []byte
IsReplaceable bool
IsAddressable bool
Pubkey []byte
Kind uint16
DTag []byte
}
var events []EventData
var convertedCount int
var deletedCount int
// Helper function for counting by predicate
countBy := func(events []EventData, predicate func(EventData) bool) int {
count := 0
for _, e := range events {
if predicate(e) {
count++
}
}
return count
}
// First pass: identify events in evt table that can benefit from inline storage
if err = d.View(
func(txn *badger.Txn) (err error) {
prf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
// Check if event data is small enough for inline storage
if len(val) <= smallEventThreshold {
// Decode event to check if it's replaceable or addressable
ev := new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
continue
}
// Extract serial from key
key := item.KeyCopy(nil)
ser := indexes.EventVars()
if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
continue
}
eventData := EventData{
Serial: ser.Get(),
EventData: val,
OldKey: key,
IsReplaceable: kind.IsReplaceable(ev.Kind),
IsAddressable: kind.IsParameterizedReplaceable(ev.Kind),
Pubkey: ev.Pubkey,
Kind: ev.Kind,
}
// Extract d-tag for addressable events
if eventData.IsAddressable {
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag != nil {
eventData.DTag = dTag.Value()
}
}
events = append(events, eventData)
}
}
return nil
},
); chk.E(err) {
return
}
log.I.F("found %d events to convert (%d regular, %d replaceable, %d addressable)",
len(events),
countBy(events, func(e EventData) bool { return !e.IsReplaceable && !e.IsAddressable }),
countBy(events, func(e EventData) bool { return e.IsReplaceable }),
countBy(events, func(e EventData) bool { return e.IsAddressable }),
)
// Second pass: convert in batches to avoid large transactions
const batchSize = 1000
for i := 0; i < len(events); i += batchSize {
end := i + batchSize
if end > len(events) {
end = len(events)
}
batch := events[i:end]
// Write new inline keys and delete old keys
if err = d.Update(
func(txn *badger.Txn) (err error) {
for _, e := range batch {
// First, write the sev key for serial-based access (all small events)
sevKeyBuf := new(bytes.Buffer)
ser := new(types.Uint40)
if err = ser.Set(e.Serial); chk.E(err) {
continue
}
if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); chk.E(err) {
continue
}
// Append size as uint16 big-endian (2 bytes)
sizeBytes := []byte{byte(len(e.EventData) >> 8), byte(len(e.EventData))}
sevKeyBuf.Write(sizeBytes)
// Append event data
sevKeyBuf.Write(e.EventData)
// Write sev key (no value needed)
if err = txn.Set(sevKeyBuf.Bytes(), nil); chk.E(err) {
log.W.F("failed to write sev key for serial %d: %v", e.Serial, err)
continue
}
convertedCount++
// Additionally, for replaceable/addressable events, write specialized keys
if e.IsAddressable && len(e.DTag) > 0 {
// Addressable event: aev|pubkey_hash|kind|dtag_hash|size|data
aevKeyBuf := new(bytes.Buffer)
pubHash := new(types.PubHash)
pubHash.FromPubkey(e.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(e.Kind)
dTagHash := new(types.Ident)
dTagHash.FromIdent(e.DTag)
if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(aevKeyBuf); chk.E(err) {
continue
}
// Append size and data
aevKeyBuf.Write(sizeBytes)
aevKeyBuf.Write(e.EventData)
if err = txn.Set(aevKeyBuf.Bytes(), nil); chk.E(err) {
log.W.F("failed to write aev key for serial %d: %v", e.Serial, err)
continue
}
} else if e.IsReplaceable {
// Replaceable event: rev|pubkey_hash|kind|size|data
revKeyBuf := new(bytes.Buffer)
pubHash := new(types.PubHash)
pubHash.FromPubkey(e.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(e.Kind)
if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(revKeyBuf); chk.E(err) {
continue
}
// Append size and data
revKeyBuf.Write(sizeBytes)
revKeyBuf.Write(e.EventData)
if err = txn.Set(revKeyBuf.Bytes(), nil); chk.E(err) {
log.W.F("failed to write rev key for serial %d: %v", e.Serial, err)
continue
}
}
// Delete old evt key
if err = txn.Delete(e.OldKey); chk.E(err) {
log.W.F("failed to delete old event key for serial %d: %v", e.Serial, err)
continue
}
deletedCount++
}
return nil
},
); chk.E(err) {
log.W.F("batch update failed: %v", err)
continue
}
if (i/batchSize)%10 == 0 && i > 0 {
log.I.F("progress: %d/%d events converted", i, len(events))
}
}
log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount)
}

View File

@@ -177,6 +177,19 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
return
}
log.T.F("SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), ev.ID, ev.Kind)
// Serialize event once to check size
eventDataBuf := new(bytes.Buffer)
ev.MarshalBinary(eventDataBuf)
eventData := eventDataBuf.Bytes()
// Determine storage strategy (Reiser4 optimizations)
// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content
const smallEventThreshold = 384
isSmallEvent := len(eventData) <= smallEventThreshold
isReplaceableEvent := kind.IsReplaceable(ev.Kind)
isAddressableEvent := kind.IsParameterizedReplaceable(ev.Kind)
// Start a transaction to save the event and all its indexes
err = d.Update(
func(txn *badger.Txn) (err error) {
@@ -185,26 +198,98 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
if err = ser.Set(serial); chk.E(err) {
return
}
keyBuf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return
}
kb := keyBuf.Bytes()
// Pre-allocate value buffer
valueBuf := new(bytes.Buffer)
ev.MarshalBinary(valueBuf)
vb := valueBuf.Bytes()
// Save each index
for _, key := range idxs {
if err = txn.Set(key, nil); chk.E(err) {
return
}
}
// write the event
if err = txn.Set(kb, vb); chk.E(err) {
return
// Write the event using optimized storage strategy
// Determine if we should use inline addressable/replaceable storage
useAddressableInline := false
var dTag *tag.T
if isAddressableEvent && isSmallEvent {
dTag = ev.Tags.GetFirst([]byte("d"))
useAddressableInline = dTag != nil
}
// All small events get a sev key for serial-based access
if isSmallEvent {
// Small event: store inline with sev prefix
// Format: sev|serial|size_uint16|event_data
keyBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian (2 bytes for size up to 65535)
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
log.T.F("SaveEvent: stored small event inline (%d bytes)", len(eventData))
} else {
// Large event: store separately with evt prefix
keyBuf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return
}
if err = txn.Set(keyBuf.Bytes(), eventData); chk.E(err) {
return
}
log.T.F("SaveEvent: stored large event separately (%d bytes)", len(eventData))
}
// Additionally, store replaceable/addressable events with specialized keys for direct access
if useAddressableInline {
// Addressable event: also store with aev|pubkey_hash|kind|dtag_hash|size|data
pubHash := new(types.PubHash)
pubHash.FromPubkey(ev.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(ev.Kind)
dTagHash := new(types.Ident)
dTagHash.FromIdent(dTag.Value())
keyBuf := new(bytes.Buffer)
if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
log.T.F("SaveEvent: also stored addressable event with specialized key")
} else if isReplaceableEvent && isSmallEvent {
// Replaceable event: also store with rev|pubkey_hash|kind|size|data
pubHash := new(types.PubHash)
pubHash.FromPubkey(ev.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(ev.Kind)
keyBuf := new(bytes.Buffer)
if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
log.T.F("SaveEvent: also stored replaceable event with specialized key")
}
return
},

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
@@ -77,6 +78,7 @@ type PolicyEvent struct {
*event.E
LoggedInPubkey string `json:"logged_in_pubkey,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
AccessType string `json:"access_type,omitempty"` // "read" or "write"
}
// MarshalJSON implements custom JSON marshaling for PolicyEvent.
@@ -109,6 +111,9 @@ func (pe *PolicyEvent) MarshalJSON() ([]byte, error) {
if pe.IPAddress != "" {
safeEvent["ip_address"] = pe.IPAddress
}
if pe.AccessType != "" {
safeEvent["access_type"] = pe.AccessType
}
return json.Marshal(safeEvent)
}
@@ -532,6 +537,17 @@ func (sr *ScriptRunner) ProcessEvent(evt *PolicyEvent) (
// Send the event JSON to the script (newline-terminated)
if _, err := stdin.Write(append(eventJSON, '\n')); chk.E(err) {
// Check if it's a broken pipe error, which means the script has died
if strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "closed pipe") {
log.E.F(
"policy script %s stdin closed (broken pipe) - script may have crashed or exited prematurely",
sr.scriptPath,
)
// Mark as not running so it will be restarted on next periodic check
sr.mutex.Lock()
sr.isRunning = false
sr.mutex.Unlock()
}
return nil, fmt.Errorf("failed to write event to script: %v", err)
}
@@ -541,6 +557,10 @@ func (sr *ScriptRunner) ProcessEvent(evt *PolicyEvent) (
log.D.S("response", response)
return &response, nil
case <-time.After(5 * time.Second):
log.W.F(
"policy script %s response timeout - script may not be responding correctly (check for debug output on stdout)",
sr.scriptPath,
)
return nil, fmt.Errorf("script response timeout")
case <-sr.ctx.Done():
return nil, fmt.Errorf("script context cancelled")
@@ -554,6 +574,7 @@ func (sr *ScriptRunner) readResponses() {
}
scanner := bufio.NewScanner(sr.stdout)
nonJSONLineCount := 0
for scanner.Scan() {
line := scanner.Text()
if line == "" {
@@ -562,10 +583,31 @@ func (sr *ScriptRunner) readResponses() {
log.D.F("policy response: %s", line)
var response PolicyResponse
if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
log.E.F(
"failed to parse policy response from %s: %v", sr.scriptPath,
err,
)
// Check if this looks like debug output
if strings.HasPrefix(line, "{") {
// Looks like JSON but failed to parse
log.E.F(
"failed to parse policy response from %s: %v\nLine: %s",
sr.scriptPath, err, line,
)
} else {
// Definitely not JSON - probably debug output
nonJSONLineCount++
if nonJSONLineCount <= 3 {
log.W.F(
"policy script %s produced non-JSON output on stdout (should only output JSONL): %q",
sr.scriptPath, line,
)
} else if nonJSONLineCount == 4 {
log.W.F(
"policy script %s continues to produce non-JSON output - suppressing further warnings",
sr.scriptPath,
)
}
log.W.F(
"IMPORTANT: Policy scripts must ONLY write JSON responses to stdout. Use stderr or a log file for debug output.",
)
}
continue
}
@@ -593,7 +635,17 @@ func (sr *ScriptRunner) logOutput(stdout, stderr io.ReadCloser) {
// Only log stderr, stdout is used by readResponses
go func() {
io.Copy(os.Stderr, stderr)
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
// Log script stderr output through relay logging system
log.I.F("[policy script %s] %s", sr.scriptPath, line)
}
}
if err := scanner.Err(); chk.E(err) {
log.E.F("error reading stderr from policy script %s: %v", sr.scriptPath, err)
}
}()
}
@@ -984,6 +1036,7 @@ func (p *P) checkScriptPolicy(
E: ev,
LoggedInPubkey: hex.Enc(loggedInPubkey),
IPAddress: ipAddress,
AccessType: access,
}
// Process event through policy script

View File

@@ -1 +1 @@
v0.27.4
v0.28.1

View File

@@ -19,34 +19,83 @@ test-docker-policy/
## What the Test Does
1. **Builds** an Ubuntu 22.04.5 Docker image with ORLY relay
2. **Configures** the policy engine with `cs-policy.js`
2. **Configures** the policy engine with `cs-policy-daemon.js`
3. **Starts** the relay with policy engine enabled
4. **Sends** a test event to the relay
5. **Verifies** that `cs-policy.js` created `/home/orly/cs-policy-output.txt`
6. **Reports** success or failure
4. **Publishes 2 events** to test write control (EVENT messages)
5. **Queries for those events** to test read control (REQ messages)
6. **Verifies** that:
- Both events were published successfully
- Events can be queried and retrieved
- Policy script processed both write and read operations
- Policy script logged to both file and relay log (stderr)
7. **Reports** detailed results with policy invocation counts
## How cs-policy.js Works
## How cs-policy-daemon.js Works
The policy script writes a timestamped message to `/home/orly/cs-policy-output.txt` each time it's executed:
The policy script is a long-lived process that:
1. Reads events from stdin (one JSON event per line)
2. Processes each event and returns a JSON response to stdout
3. Logs debug information to:
- `/home/orly/cs-policy-output.txt` (file output)
- stderr (appears in relay log with prefix `[policy script /path]`)
```javascript
#!/usr/bin/env node
const fs = require('fs')
const filePath = '/home/orly/cs-policy-output.txt'
if (fs.existsSync(filePath)) {
fs.appendFileSync(filePath, `${Date.now()}: Hey there!\n`)
} else {
fs.writeFileSync(filePath, `${Date.now()}: Hey there!\n`)
}
```
**Key Features:**
- Logs event details including kind, ID, and access type (read/write)
- Writes debug output to stderr which appears in the relay log
- Returns JSON responses to stdout for policy decisions
## Quick Start
Run the automated test:
```bash
./test-docker-policy/test-policy.sh
./scripts/docker-policy/test-policy.sh
```
## Policy Test Tool
The `policytest` tool is a command-line utility for testing policy enforcement:
```bash
# Test write control (EVENT messages)
./policytest -url ws://localhost:8777 -type event -kind 1
# Test read control (REQ messages)
./policytest -url ws://localhost:8777 -type req -kind 1
# Test both write and read control
./policytest -url ws://localhost:8777 -type both -kind 1
# Publish multiple events and query for them (full integration test)
./policytest -url ws://localhost:8777 -type publish-and-query -kind 1 -count 2
```
### Options
- `-url` - Relay WebSocket URL (default: `ws://127.0.0.1:3334`)
- `-type` - Test type:
- `event` - Test write control only
- `req` - Test read control only
- `both` - Test write then read
- `publish-and-query` - Publish events then query for them (full test)
- `-kind` - Event kind to test (default: `4678`)
- `-count` - Number of events to publish for `publish-and-query` (default: `2`)
- `-timeout` - Operation timeout (default: `20s`)
### Output
The `publish-and-query` test provides detailed output:
```
Publishing 2 events of kind 1...
Event 1/2 published successfully (id: a1b2c3d4...)
Event 2/2 published successfully (id: e5f6g7h8...)
PUBLISH: 2 accepted, 0 rejected out of 2 total
Querying for events of kind 1...
Query returned 2 events
QUERY: found 2/2 published events (total returned: 2)
SUCCESS: All published events were retrieved
```
## Manual Testing
@@ -135,15 +184,50 @@ docker exec orly-policy-test netstat -tlnp | grep 8777
When successful, you should see:
```
=== Step 9: Publishing 2 events and querying for them ===
--- Publishing and querying events ---
Publishing 2 events of kind 1...
Event 1/2 published successfully (id: abc12345...)
Event 2/2 published successfully (id: def67890...)
PUBLISH: 2 accepted, 0 rejected out of 2 total
Querying for events of kind 1...
Query returned 2 events
QUERY: found 2/2 published events (total returned: 2)
SUCCESS: All published events were retrieved
=== Step 10: Checking relay logs ===
INFO [policy script /home/orly/cs-policy-daemon.js] [cs-policy] Policy script started
INFO [policy script /home/orly/cs-policy-daemon.js] [cs-policy] Processing event abc12345, kind: 1, access: write
INFO [policy script /home/orly/cs-policy-daemon.js] [cs-policy] Processing event def67890, kind: 1, access: write
INFO [policy script /home/orly/cs-policy-daemon.js] [cs-policy] Processing event abc12345, kind: 1, access: read
INFO [policy script /home/orly/cs-policy-daemon.js] [cs-policy] Processing event def67890, kind: 1, access: read
=== Step 12: Checking output file ===
✓ SUCCESS: cs-policy-output.txt file exists!
Output file contents:
1704123456789: Hey there!
1234567890123: Policy script started
1234567890456: Event ID: abc12345..., Kind: 1, Access: write
1234567890789: Event ID: def67890..., Kind: 1, Access: write
1234567891012: Event ID: abc12345..., Kind: 1, Access: read
1234567891234: Event ID: def67890..., Kind: 1, Access: read
Policy script is working correctly!
Policy invocations summary:
- Write operations (EVENT): 2 (expected: 2)
- Read operations (REQ): 2 (expected: >=1)
✓ SUCCESS: Policy script processed both write and read operations!
- Published 2 events (write control)
- Queried events (read control)
```
Each line in the output file represents one execution of the policy script, with a Unix timestamp.
The test verifies:
- **Write Control**: Policy script processes EVENT messages (2 publications)
- **Read Control**: Policy script processes REQ messages (query retrieves events)
- **Dual Logging**: Script output appears in both file and relay log (stderr)
- **Event Lifecycle**: Events are stored and can be retrieved
## Configuration Files

View File

@@ -12,20 +12,27 @@ const rl = readline.createInterface({
terminal: false
});
// Log that script started
// Log that script started - to both file and stderr
fs.appendFileSync(filePath, `${Date.now()}: Policy script started\n`);
console.error('[cs-policy] Policy script started');
// Process each line of input (policy events)
rl.on('line', (line) => {
try {
// Log that we received an event
// Log that we received an event (to file)
fs.appendFileSync(filePath, `${Date.now()}: Received event: ${line.substring(0, 100)}...\n`);
// Parse the policy event
const event = JSON.parse(line);
// Log event details
fs.appendFileSync(filePath, `${Date.now()}: Event ID: ${event.id || 'unknown'}\n`);
// Log event details including access type
const accessType = event.access_type || 'unknown';
const eventKind = event.kind || 'unknown';
const eventId = event.id || 'unknown';
// Log to both file and stderr (stderr appears in relay log)
fs.appendFileSync(filePath, `${Date.now()}: Event ID: ${eventId}, Kind: ${eventKind}, Access: ${accessType}\n`);
console.error(`[cs-policy] Processing event ${eventId.substring(0, 8)}, kind: ${eventKind}, access: ${accessType}`);
// Respond with "accept" to allow the event
const response = {
@@ -36,8 +43,9 @@ rl.on('line', (line) => {
console.log(JSON.stringify(response));
} catch (err) {
// Log errors
// Log errors to both file and stderr
fs.appendFileSync(filePath, `${Date.now()}: Error: ${err.message}\n`);
console.error(`[cs-policy] Error processing event: ${err.message}`);
// Reject on error
console.log(JSON.stringify({
@@ -49,4 +57,5 @@ rl.on('line', (line) => {
rl.on('close', () => {
fs.appendFileSync(filePath, `${Date.now()}: Policy script stopped\n`);
console.error('[cs-policy] Policy script stopped');
});

View File

@@ -3,5 +3,5 @@ ORLY_APP_NAME="orly"
ORLY_PUBLIC_READABLE=true
ORLY_PRIVATE=false
ORLY_OWNERS=4db2c42f3c02079dd6feae3f88f6c8693940a00ade3cc8e5d72050bd6e577cd5
ORLY_LOG_LEVEL=debug
ORLY_LOG_LEVEL=trace
ORLY_POLICY_ENABLED=true

View File

@@ -29,33 +29,31 @@ cp "$REPO_ROOT/orly" "$SCRIPT_DIR/"
cp "$REPO_ROOT/pkg/crypto/p8k/libsecp256k1.so" "$SCRIPT_DIR/"
echo ""
echo -e "${YELLOW}Step 3: Building Docker image...${NC}"
echo -e "${YELLOW}Step 3: Cleaning up old containers...${NC}"
cd "$SCRIPT_DIR" && docker-compose down -v 2>/dev/null || true
echo ""
echo -e "${YELLOW}Step 4: Building Docker image...${NC}"
cd "$SCRIPT_DIR" && docker-compose build
echo ""
echo -e "${YELLOW}Step 4: Starting ORLY relay container...${NC}"
echo -e "${YELLOW}Step 5: Starting ORLY relay container...${NC}"
cd "$SCRIPT_DIR" && docker-compose up -d
echo ""
echo -e "${YELLOW}Step 5: Waiting for relay to start (15 seconds)...${NC}"
echo -e "${YELLOW}Step 6: Waiting for relay to start (15 seconds)...${NC}"
sleep 15
echo ""
echo -e "${YELLOW}Step 6: Checking relay logs...${NC}"
echo -e "${YELLOW}Step 7: Checking relay logs...${NC}"
docker logs orly-policy-test 2>&1 | tail -20
echo ""
echo -e "${YELLOW}Step 7: Sending test event to relay...${NC}"
echo -e "${YELLOW}Step 8: Building policytest tool...${NC}"
cd "$REPO_ROOT" && CGO_ENABLED=0 go build -o policytest ./cmd/policytest
# Install websocat if not available
if ! command -v websocat &> /dev/null; then
echo "websocat not found. Installing..."
wget -qO- https://github.com/vi/websocat/releases/download/v1.12.0/websocat.x86_64-unknown-linux-musl -O /tmp/websocat
chmod +x /tmp/websocat
WEBSOCAT="/tmp/websocat"
else
WEBSOCAT="websocat"
fi
echo ""
echo -e "${YELLOW}Step 9: Publishing 2 events and querying for them...${NC}"
# Check which port the relay is listening on
RELAY_PORT=$(docker logs orly-policy-test 2>&1 | grep "starting listener" | grep -oP ':\K[0-9]+' | head -1)
@@ -64,20 +62,22 @@ if [ -z "$RELAY_PORT" ]; then
fi
echo "Relay is listening on port: $RELAY_PORT"
# Generate a test event with a properly formatted (but invalid) signature
# The policy script should still receive this event even if validation fails
TIMESTAMP=$(date +%s)
TEST_EVENT='["EVENT",{"id":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa","pubkey":"4db2c42f3c02079dd6feae3f88f6c8693940a00ade3cc8e5d72050bd6e577cd5","created_at":'$TIMESTAMP',"kind":1,"tags":[],"content":"Test event for policy validation","sig":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}]'
echo "Sending test event..."
echo "$TEST_EVENT" | timeout 5 $WEBSOCAT ws://localhost:$RELAY_PORT 2>&1 || echo "Connection attempt completed"
# Test publish and query - this will publish 2 events and query for them
cd "$REPO_ROOT"
echo ""
echo "--- Publishing and querying events ---"
./policytest -url "ws://localhost:$RELAY_PORT" -type publish-and-query -kind 1 -count 2 2>&1
echo ""
echo -e "${YELLOW}Step 8: Waiting for policy script to execute (5 seconds)...${NC}"
sleep 5
echo -e "${YELLOW}Step 10: Checking relay logs...${NC}"
docker logs orly-policy-test 2>&1 | tail -20
echo ""
echo -e "${YELLOW}Step 9: Checking if cs-policy.js created output file...${NC}"
echo -e "${YELLOW}Step 11: Waiting for policy script to process (3 seconds)...${NC}"
sleep 3
echo ""
echo -e "${YELLOW}Step 12: Checking if cs-policy.js created output file...${NC}"
# Check if the output file exists in the container
if docker exec orly-policy-test test -f /home/orly/cs-policy-output.txt; then
@@ -86,8 +86,35 @@ if docker exec orly-policy-test test -f /home/orly/cs-policy-output.txt; then
echo "Output file contents:"
docker exec orly-policy-test cat /home/orly/cs-policy-output.txt
echo ""
echo -e "${GREEN}✓ Policy script is working correctly!${NC}"
EXIT_CODE=0
# Check if we see both read and write access types
WRITE_COUNT=$(docker exec orly-policy-test cat /home/orly/cs-policy-output.txt | grep -c "Access: write" || echo "0")
READ_COUNT=$(docker exec orly-policy-test cat /home/orly/cs-policy-output.txt | grep -c "Access: read" || echo "0")
echo "Policy invocations summary:"
echo " - Write operations (EVENT): $WRITE_COUNT (expected: 2)"
echo " - Read operations (REQ): $READ_COUNT (expected: >=1)"
echo ""
# Analyze results
if [ "$WRITE_COUNT" -ge 2 ] && [ "$READ_COUNT" -ge 1 ]; then
echo -e "${GREEN}✓ SUCCESS: Policy script processed both write and read operations!${NC}"
echo -e "${GREEN} - Published 2 events (write control)${NC}"
echo -e "${GREEN} - Queried events (read control)${NC}"
EXIT_CODE=0
elif [ "$WRITE_COUNT" -gt 0 ] && [ "$READ_COUNT" -gt 0 ]; then
echo -e "${YELLOW}⚠ PARTIAL: Policy invoked but counts don't match expected${NC}"
echo -e "${YELLOW} - Write count: $WRITE_COUNT (expected 2)${NC}"
echo -e "${YELLOW} - Read count: $READ_COUNT (expected >=1)${NC}"
EXIT_CODE=0
elif [ "$WRITE_COUNT" -gt 0 ]; then
echo -e "${YELLOW}⚠ WARNING: Policy script only processed write operations${NC}"
echo -e "${YELLOW} Read operations may not have been tested or logged${NC}"
EXIT_CODE=0
else
echo -e "${YELLOW}⚠ WARNING: Policy script is working but access types may not be logged correctly${NC}"
EXIT_CODE=0
fi
else
echo -e "${RED}✗ FAILURE: cs-policy-output.txt file not found!${NC}"
echo ""
@@ -97,7 +124,7 @@ else
fi
echo ""
echo -e "${YELLOW}Step 10: Additional debugging info...${NC}"
echo -e "${YELLOW}Step 13: Additional debugging info...${NC}"
echo "Files in /home/orly directory:"
docker exec orly-policy-test ls -la /home/orly/