Improve syncing

This commit is contained in:
Jon Staab
2025-10-28 11:29:59 -07:00
parent 2e8678e4c6
commit 7502004aba
3 changed files with 233 additions and 141 deletions

View File

@@ -38,7 +38,7 @@
import SocketStatusIndicator from "@app/components/SocketStatusIndicator.svelte"
import {
ENABLE_ZAPS,
MESSAGE_FILTER,
CONTENT_KINDS,
deriveSpaceMembers,
deriveEventsForUrl,
deriveUserRooms,
@@ -65,7 +65,7 @@
const hasAlerts = $derived($alerts.some(a => getTagValue("feed", a.tags)?.includes(url)))
const spaceKinds = derived(
deriveEventsForUrl(url, [MESSAGE_FILTER]),
deriveEventsForUrl(url, [{kinds: CONTENT_KINDS}]),
$events => new Set($events.map(e => e.kind)),
)

View File

@@ -310,20 +310,9 @@ if (ENABLE_ZAPS) {
REACTION_KINDS.push(ZAP_RESPONSE)
}
export const MESSAGE_KINDS = [ZAP_GOAL, EVENT_TIME, THREAD, MESSAGE]
export const CONTENT_KINDS = [ZAP_GOAL, EVENT_TIME, THREAD]
export const MESSAGE_FILTER = {kinds: MESSAGE_KINDS}
export const COMMENT_FILTER = makeCommentFilter(MESSAGE_KINDS)
export const MEMBERSHIP_KINDS = [
ROOM_ADD_MEMBER,
ROOM_REMOVE_MEMBER,
RELAY_ADD_MEMBER,
RELAY_REMOVE_MEMBER,
]
export const MEMBERSHIP_FILTER = {kinds: MEMBERSHIP_KINDS}
export const MESSAGE_KINDS = [...CONTENT_KINDS, MESSAGE]
// Settings

View File

@@ -1,23 +1,12 @@
import {page} from "$app/stores"
import type {Unsubscriber} from "svelte/store"
import {derived, get} from "svelte/store"
import {
partition,
call,
sortBy,
assoc,
chunk,
sleep,
now,
identity,
WEEK,
MONTH,
ago,
} from "@welshman/lib"
import {partition, call, sortBy, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib"
import {
getListTags,
getRelayTagValues,
WRAP,
MESSAGE,
ROOM_META,
ROOM_DELETE,
ROOM_ADMINS,
@@ -26,6 +15,8 @@ import {
ROOM_REMOVE_MEMBER,
ROOM_CREATE_PERMISSION,
RELAY_MEMBERS,
RELAY_ADD_MEMBER,
RELAY_REMOVE_MEMBER,
isSignedEvent,
} from "@welshman/util"
import type {Filter, TrustedEvent} from "@welshman/util"
@@ -45,19 +36,22 @@ import {
repository,
shouldUnwrap,
hasNegentropy,
relaysByUrl,
} from "@welshman/app"
import {
MESSAGE_FILTER,
COMMENT_FILTER,
MEMBERSHIP_FILTER,
CONTENT_KINDS,
INDEXER_RELAYS,
REACTION_KINDS,
loadSettings,
loadGroupSelections,
userSpaceUrls,
userGroupSelections,
bootstrapPubkeys,
decodeRelay,
getUrlsForEvent,
hasNip29,
getSpaceUrlsFromGroupSelections,
getSpaceRoomsFromGroupSelections,
makeCommentFilter,
} from "@app/core/state"
import {loadAlerts, loadAlertStatuses} from "@app/core/requests"
import {hasBlossomSupport} from "@app/core/commands"
@@ -70,7 +64,7 @@ type PullOpts = {
signal: AbortSignal
}
const pullConservatively = ({relays, filters, signal}: PullOpts) => {
const pullWithFallback = ({relays, filters, signal}: PullOpts) => {
const $getUrlsForEvent = get(getUrlsForEvent)
const [smart, dumb] = partition(hasNegentropy, relays)
const events = repository.query(filters, {shouldSort: false}).filter(isSignedEvent)
@@ -91,6 +85,11 @@ const pullConservatively = ({relays, filters, signal}: PullOpts) => {
return Promise.all(promises)
}
const pullAndListen = ({relays, filters, signal}: PullOpts) => {
pullWithFallback({relays, signal, filters: filters.map(assoc("limit", 30))})
request({relays, signal, filters: filters.map(assoc("limit", 0))})
}
// Relays
const syncRelays = () => {
@@ -121,10 +120,80 @@ const syncRelays = () => {
// User data
const syncUserSpaceMembership = (url: string) => {
const $pubkey = pubkey.get()
const controller = new AbortController()
if ($pubkey) {
pullAndListen({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER, ROOM_CREATE_PERMISSION],
"#p": [$pubkey],
},
],
})
}
return () => controller.abort()
}
const syncUserRoomMembership = (url: string, room: string) => {
const $pubkey = pubkey.get()
const controller = new AbortController()
if ($pubkey) {
pullAndListen({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER],
"#p": [$pubkey],
"#h": [room],
},
],
})
}
return () => controller.abort()
}
const syncUserData = () => {
const unsubscribePubkey = pubkey.subscribe($pubkey => {
const unsubscribersByKey = new Map<string, Unsubscriber>()
const unsubscribeGroupSelections = userGroupSelections.subscribe($l => {
const $pubkey = pubkey.get()
if ($pubkey) {
loadRelaySelections($pubkey)
const keys = new Set<string>()
for (const url of getSpaceUrlsFromGroupSelections($l)) {
if (!unsubscribersByKey.has(url)) {
unsubscribersByKey.set(url, syncUserSpaceMembership(url))
}
keys.add(url)
for (const room of getSpaceRoomsFromGroupSelections(url, $l)) {
const key = `${url}'${room}`
if (!unsubscribersByKey.has(key)) {
unsubscribersByKey.set(key, syncUserRoomMembership(url, room))
}
keys.add(key)
}
}
for (const [key, unsubscribe] of unsubscribersByKey.entries()) {
if (!keys.has(key)) {
unsubscribersByKey.delete(key)
unsubscribe()
}
}
}
})
@@ -148,154 +217,194 @@ const syncUserData = () => {
// This isn't urgent, avoid clogging other stuff up
await sleep(1000)
for (const pk of pubkeys) {
loadRelaySelections(pk).then(() => {
loadGroupSelections(pk)
loadProfile(pk)
loadFollows(pk)
loadMutes(pk)
})
}
await Promise.all(
pubkeys.map(async pk => {
await loadRelaySelections(pk)
await loadGroupSelections(pk)
await loadProfile(pk)
await loadFollows(pk)
await loadMutes(pk)
}),
)
}
})
const unsubscribePubkey = pubkey.subscribe($pubkey => {
if ($pubkey) {
loadRelaySelections($pubkey)
}
})
return () => {
unsubscribePubkey()
unsubscribersByKey.forEach(call)
unsubscribeGroupSelections()
unsubscribeSelections()
unsubscribeFollows()
unsubscribePubkey()
}
}
// Memberships
const syncMembership = (url: string) => {
const controller = new AbortController()
const relayFilter = {kinds: [RELAY_MEMBERS, ROOM_CREATE_PERMISSION]}
const roomsFilter = {kinds: [ROOM_ADMINS, ROOM_MEMBERS, ROOM_META, ROOM_DELETE]}
// Load group metadata and member lists
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [relayFilter, roomsFilter],
})
// Load historical data from up to a month ago for quick page loading
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [MESSAGE_FILTER, COMMENT_FILTER, MEMBERSHIP_FILTER].map(assoc("since", ago(MONTH))),
})
// Listen for new events
request({
relays: [url],
signal: controller.signal,
filters: [relayFilter, roomsFilter, MESSAGE_FILTER, COMMENT_FILTER, MEMBERSHIP_FILTER].map(
assoc("since", now()),
),
})
return () => controller.abort()
}
const syncMemberships = () => {
const unsubscribersByUrl = new Map<string, Unsubscriber>()
const unsubscribeSpaceUrls = userSpaceUrls.subscribe(urls => {
// stop syncing removed spaces
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
if (!urls.includes(url)) {
unsubscribersByUrl.delete(url)
unsubscribe()
}
}
// Start syncing newly added spaces
for (const url of urls) {
if (!unsubscribersByUrl.has(url)) {
unsubscribersByUrl.set(url, syncMembership(url))
}
}
})
return () => {
Array.from(unsubscribersByUrl.values()).forEach(call)
unsubscribeSpaceUrls()
}
}
// Sync extra stuff for the current space
// Spaces
const syncSpace = (url: string) => {
const $pubkey = pubkey.get()
const controller = new AbortController()
// Load all membership changes for the current user
if ($pubkey) {
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER],
"#p": [$pubkey],
},
],
})
}
// Listen actively for all current membership changes, reports, reactions, zaps, etc
request({
pullAndListen({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER, ...REACTION_KINDS],
since: now(),
},
{kinds: [RELAY_MEMBERS]},
{kinds: [RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]},
...CONTENT_KINDS.map(kind => ({kinds: [kind]})),
makeCommentFilter(CONTENT_KINDS),
],
})
return () => controller.abort()
}
const syncCurrentSpace = () => {
const unsubscribersByUrl = new Map<string, Unsubscriber>()
const syncSpaces = () => {
const membershipUnsubscribersByUrl = new Map<string, Unsubscriber>()
const unsubscribeSpaceUrls = userSpaceUrls.subscribe(urls => {
// stop syncing removed spaces
for (const [url, unsubscribe] of membershipUnsubscribersByUrl.entries()) {
if (!urls.includes(url)) {
membershipUnsubscribersByUrl.delete(url)
unsubscribe()
}
}
// Start syncing newly added spaces
for (const url of urls) {
if (!membershipUnsubscribersByUrl.has(url)) {
membershipUnsubscribersByUrl.set(url, syncSpace(url))
}
}
})
const pageUnsubscribersByUrl = new Map<string, Unsubscriber>()
// Sync the space the user is currently visiting
const unsubscribePage = page.subscribe($page => {
if ($page.params.relay) {
const url = decodeRelay($page.params.relay)
if (!unsubscribersByUrl.has(url)) {
unsubscribersByUrl.set(url, syncSpace(url))
// Don't subscribe twice if the user is a member
if (!pageUnsubscribersByUrl.has(url) && !get(userSpaceUrls).includes(url)) {
pageUnsubscribersByUrl.set(url, syncSpace(url))
}
for (const [oldUrl, unsubscribe] of unsubscribersByUrl.entries()) {
// Clean up old subscriptions
for (const [oldUrl, unsubscribe] of pageUnsubscribersByUrl.entries()) {
if (url !== oldUrl) {
unsubscribersByUrl.delete(oldUrl)
pageUnsubscribersByUrl.delete(oldUrl)
unsubscribe()
}
}
} else {
Array.from(unsubscribersByUrl.values()).forEach(call)
Array.from(pageUnsubscribersByUrl.values()).forEach(call)
}
})
return () => {
Array.from(unsubscribersByUrl.values()).forEach(call)
Array.from(membershipUnsubscribersByUrl.values()).forEach(call)
Array.from(pageUnsubscribersByUrl.values()).forEach(call)
unsubscribeSpaceUrls()
unsubscribePage()
}
}
// Chat
const syncSpaceChat = (url: string) => {
const controller = new AbortController()
console.log(url)
pullAndListen({
relays: [url],
signal: controller.signal,
filters: [{kinds: [MESSAGE]}],
})
return () => controller.abort()
}
const syncRoomChat = (url: string, room: string) => {
const controller = new AbortController()
pullAndListen({
relays: [url],
signal: controller.signal,
filters: [
{kinds: [ROOM_ADMINS, ROOM_MEMBERS, ROOM_META], "#d": [room]},
{kinds: [ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER], "#h": [room]},
{kinds: [ROOM_DELETE], "#h": [room]},
{kinds: [MESSAGE], "#h": [room]},
],
})
return () => controller.abort()
}
const syncRooms = () => {
const unsubscribersByKey = new Map<string, Unsubscriber>()
const unsubscribeSpaceUrls = derived([userGroupSelections, relaysByUrl], identity).subscribe(
([$l, $relaysByUrl]) => {
const keys = new Set<string>()
const newUnsubscribersByKey = new Map<string, Unsubscriber>()
// Add new subscriptions, depending on whether nip 29 is supported
for (const url of getRelayTagValues(getListTags($l))) {
if (hasNip29($relaysByUrl.get(url))) {
for (const room of getSpaceRoomsFromGroupSelections(url, $l)) {
const id = `${url}'${room}`
if (!unsubscribersByKey.has(id)) {
newUnsubscribersByKey.set(url, syncRoomChat(url, room))
}
keys.add(id)
}
} else {
if (!unsubscribersByKey.has(url)) {
newUnsubscribersByKey.set(url, syncSpaceChat(url))
}
keys.add(url)
}
}
// Stop syncing removed selections
for (const [key, unsubscribe] of unsubscribersByKey.entries()) {
if (!keys.has(key)) {
unsubscribersByKey.delete(key)
unsubscribe()
}
}
// Start syncing newly added spaces
for (const [key, unsubscriber] of newUnsubscribersByKey.entries()) {
unsubscribersByKey.set(key, unsubscriber)
}
},
)
return () => {
Array.from(unsubscribersByKey.values()).forEach(call)
unsubscribeSpaceUrls()
}
}
// DMs
const syncDMRelay = (url: string, pubkey: string) => {
const controller = new AbortController()
// Load historical data
pullConservatively({
pullWithFallback({
relays: [url],
signal: controller.signal,
filters: [{kinds: [WRAP], "#p": [pubkey], until: ago(WEEK, 2)}],
@@ -378,13 +487,7 @@ const syncDMs = () => {
// Merge all synchronization functions
export const syncApplicationData = () => {
const unsubscribers = [
syncRelays(),
syncUserData(),
syncMemberships(),
syncCurrentSpace(),
syncDMs(),
]
const unsubscribers = [syncRelays(), syncUserData(), syncSpaces(), syncRooms(), syncDMs()]
return () => unsubscribers.forEach(call)
}