Get rid of subscribePersistent

This commit is contained in:
Jon Staab
2024-12-16 16:14:51 -08:00
parent fe789c461d
commit 7ddc1657ad
4 changed files with 23 additions and 51 deletions

View File

@@ -1,8 +1,6 @@
import type {Unsubscriber} from "svelte/store"
import {sleep, partition, assoc, now} from "@welshman/lib"
import {partition, assoc, now} from "@welshman/lib"
import {MESSAGE, REACTION, DELETE, THREAD, COMMENT} from "@welshman/util"
import type {SubscribeRequestWithHandlers, Subscription} from "@welshman/net"
import {SubscriptionEvent} from "@welshman/net"
import type {Subscription} from "@welshman/net"
import type {AppSyncOpts} from "@welshman/app"
import {subscribe, repository, load, pull, hasNegentropy} from "@welshman/app"
import {userRoomsByUrl, LEGACY_MESSAGE, GENERAL, getEventsForUrl} from "@app/state"
@@ -28,38 +26,11 @@ export const pullConservatively = ({relays, filters}: AppSyncOpts) => {
return Promise.all(promises)
}
export const subscribePersistent = (request: SubscribeRequestWithHandlers) => {
let sub: Subscription
let done = false
const start = async () => {
// If the subscription gets closed quickly, don't start flapping
await Promise.all([
sleep(30_000),
new Promise(resolve => {
sub = subscribe(request)
sub.emitter.on(SubscriptionEvent.Complete, resolve)
}),
])
if (!done) {
start()
}
}
start()
return () => {
done = true
sub?.close()
}
}
// Application requests
export const listenForNotifications = () => {
const since = now()
const unsubscribers: Unsubscriber[] = []
const subs: Subscription[] = []
for (const [url, rooms] of userRoomsByUrl.get()) {
load({
@@ -71,8 +42,8 @@ export const listenForNotifications = () => {
],
})
unsubscribers.push(
subscribePersistent({
subs.push(
subscribe({
relays: [url],
filters: [
{kinds: [THREAD], since},
@@ -84,8 +55,8 @@ export const listenForNotifications = () => {
}
return () => {
for (const unsubscribe of unsubscribers) {
unsubscribe()
for (const sub of subs) {
sub.close()
}
}
}
@@ -103,5 +74,5 @@ export const listenForChannelMessages = (url: string, room: string) => {
pullConservatively({relays, filters: [{kinds, "#h": [room]}]})
// Listen for new messages
return subscribePersistent({relays, filters: [{kinds, "#h": [room], since}]})
return subscribe({relays, filters: [{kinds, "#h": [room], since}]})
}

View File

@@ -35,6 +35,7 @@
signer,
dropSession,
getRelayUrls,
subscribe,
userInboxRelaySelections,
} from "@welshman/app"
import * as lib from "@welshman/lib"
@@ -49,7 +50,7 @@
import {theme} from "@app/theme"
import {INDEXER_RELAYS, userMembership, ensureUnwrapped, canDecrypt} from "@app/state"
import {loadUserData} from "@app/commands"
import {subscribePersistent, listenForNotifications} from "@app/requests"
import {listenForNotifications} from "@app/requests"
import * as commands from "@app/commands"
import * as requests from "@app/requests"
import * as notifications from "@app/notifications"
@@ -184,22 +185,22 @@
}
// Listen for space data, populate space-based notifications
let unsubSpaces: any
let spacesSub: any
userMembership.subscribe($membership => {
unsubSpaces?.()
unsubSpaces = listenForNotifications()
spacesSub?.close()
spacesSub = listenForNotifications()
})
// Listen for chats, populate chat-based notifications
let unsubChats: any
let chatsSub: any
derived([pubkey, userInboxRelaySelections], identity).subscribe(
([$pubkey, $userInboxRelaySelections]) => {
unsubChats?.()
chatsSub?.close()
if ($pubkey) {
unsubChats = subscribePersistent({
chatsSub = subscribe({
filters: [
{kinds: [WRAP], "#p": [$pubkey], since: ago(WEEK, 2)},
{kinds: [WRAP], "#p": [$pubkey], limit: 100},

View File

@@ -9,6 +9,7 @@
import type {TrustedEvent, EventContent} from "@welshman/util"
import {throttled} from "@welshman/store"
import {createEvent, MESSAGE} from "@welshman/util"
import type {Subscription} from "@welshman/net"
import {formatTimestampAsDate, publishThunk, deriveRelay} from "@welshman/app"
import {slide} from "@lib/transition"
import {createScroller, type Scroller} from "@lib/html"
@@ -92,7 +93,7 @@
let limit = 30
let loading = sleep(5000)
let unsub: () => void
let sub: Subscription
let element: HTMLElement
let scroller: Scroller
let editor: Readable<Editor>
@@ -139,13 +140,13 @@
},
})
unsub = listenForChannelMessages(url, room)
sub = listenForChannelMessages(url, room)
})
onDestroy(() => {
setChecked($page.url.pathname)
scroller?.stop()
unsub?.()
sub?.close()
})
</script>

View File

@@ -3,7 +3,7 @@
import {page} from "$app/stores"
import {sortBy, nthEq, sleep} from "@welshman/lib"
import {COMMENT} from "@welshman/util"
import {repository} from "@welshman/app"
import {repository, subscribe} from "@welshman/app"
import {deriveEvents} from "@welshman/store"
import Icon from "@lib/components/Icon.svelte"
import PageBar from "@lib/components/PageBar.svelte"
@@ -15,7 +15,6 @@
import ThreadActions from "@app/components/ThreadActions.svelte"
import ThreadReply from "@app/components/ThreadReply.svelte"
import {deriveEvent, decodeRelay} from "@app/state"
import {subscribePersistent} from "@app/requests"
import {setChecked} from "@app/notifications"
const {relay, id} = $page.params
@@ -44,10 +43,10 @@
$: title = $event?.tags.find(nthEq(0, "title"))?.[1] || ""
onMount(() => {
const unsub = subscribePersistent({relays: [url], filters})
const sub = subscribe({relays: [url], filters})
return () => {
unsub()
sub.close()
setChecked($page.url.pathname)
}
})