mirror of
https://github.com/coracle-social/flotilla.git
synced 2025-12-11 03:17:02 +00:00
Remove shards entirely, fix setup in layout
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
import {prop, first, call, on, groupBy, throttle, fromPairs, batch, sortBy, concat} from "@welshman/lib"
|
import {prop, call, on, throttle, fromPairs, batch} from "@welshman/lib"
|
||||||
import {throttled, freshness} from "@welshman/store"
|
import {throttled, freshness} from "@welshman/store"
|
||||||
import {
|
import {
|
||||||
PROFILE,
|
PROFILE,
|
||||||
@@ -96,42 +96,23 @@ const syncEvents = async () => {
|
|||||||
repository,
|
repository,
|
||||||
"update",
|
"update",
|
||||||
batch(3000, async (updates: RepositoryUpdate[]) => {
|
batch(3000, async (updates: RepositoryUpdate[]) => {
|
||||||
let added: TrustedEvent[] = []
|
const add: TrustedEvent[] = []
|
||||||
const removed = new Set<string>()
|
const remove = new Set<string>()
|
||||||
|
|
||||||
for (const update of updates) {
|
for (const update of updates) {
|
||||||
for (const event of update.added) {
|
for (const event of update.added) {
|
||||||
if (rankEvent(event) > 0) {
|
if (rankEvent(event) > 0) {
|
||||||
added.push(event)
|
add.push(event)
|
||||||
removed.delete(event.id)
|
remove.delete(event.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const id of update.removed) {
|
for (const id of update.removed) {
|
||||||
removed.add(id)
|
remove.add(id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (removed.size > 0) {
|
await collection.update({add, remove})
|
||||||
added = added.filter(e => !removed.has(e.id))
|
|
||||||
|
|
||||||
const removedByShard = groupBy(id => collection.getShardId(id), removed)
|
|
||||||
const addedByShard = groupBy(collection.getShardIdFromItem, added)
|
|
||||||
const shards = new Set([...removedByShard.keys(), ...addedByShard.keys()])
|
|
||||||
|
|
||||||
for (const shard of shards) {
|
|
||||||
const removedInShard = removedByShard.get(shard)
|
|
||||||
const addedInShard = addedByShard.get(shard) || []
|
|
||||||
const current = await collection.getShard(shard)
|
|
||||||
const filtered = current.filter(e => !removedInShard?.includes(e.id))
|
|
||||||
const sorted = sortBy(e => -rankEvent(e), concat(filtered, addedInShard))
|
|
||||||
const pruned = sorted.slice(0, 1000)
|
|
||||||
|
|
||||||
await collection.setShard(shard, pruned)
|
|
||||||
}
|
|
||||||
} else if (added.length > 0) {
|
|
||||||
await collection.add(added)
|
|
||||||
}
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -139,7 +120,10 @@ const syncEvents = async () => {
|
|||||||
type TrackerItem = [string, string[]]
|
type TrackerItem = [string, string[]]
|
||||||
|
|
||||||
const syncTracker = async () => {
|
const syncTracker = async () => {
|
||||||
const collection = new Collection<TrackerItem>({table: "tracker", getId: first})
|
const collection = new Collection<TrackerItem>({
|
||||||
|
table: "tracker",
|
||||||
|
getId: (item: TrackerItem) => item[0],
|
||||||
|
})
|
||||||
|
|
||||||
const relaysById = new Map<string, Set<string>>()
|
const relaysById = new Map<string, Set<string>>()
|
||||||
|
|
||||||
@@ -199,7 +183,10 @@ const syncZappers = async () => {
|
|||||||
type FreshnessItem = [string, number]
|
type FreshnessItem = [string, number]
|
||||||
|
|
||||||
const syncFreshness = async () => {
|
const syncFreshness = async () => {
|
||||||
const collection = new Collection<FreshnessItem>({table: "freshness", getId: first})
|
const collection = new Collection<FreshnessItem>({
|
||||||
|
table: "freshness",
|
||||||
|
getId: (item: FreshnessItem) => item[0],
|
||||||
|
})
|
||||||
|
|
||||||
freshness.set(fromPairs(await collection.get()))
|
freshness.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
@@ -211,7 +198,10 @@ const syncFreshness = async () => {
|
|||||||
type PlaintextItem = [string, string]
|
type PlaintextItem = [string, string]
|
||||||
|
|
||||||
const syncPlaintext = async () => {
|
const syncPlaintext = async () => {
|
||||||
const collection = new Collection<PlaintextItem>({table: "plaintext", getId: first})
|
const collection = new Collection<PlaintextItem>({
|
||||||
|
table: "plaintext",
|
||||||
|
getId: (item: PlaintextItem) => item[0],
|
||||||
|
})
|
||||||
|
|
||||||
plaintext.set(fromPairs(await collection.get()))
|
plaintext.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
@@ -239,16 +229,15 @@ const syncWrapManager = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const syncDataStores = async () => {
|
export const syncDataStores = async () => {
|
||||||
const t = Date.now()
|
|
||||||
const unsubscribers = await Promise.all([
|
const unsubscribers = await Promise.all([
|
||||||
syncEvents().then(f => console.log("syncEvents", Date.now() - t) || f),
|
syncEvents(),
|
||||||
syncTracker().then(f => console.log("syncTracker", Date.now() - t) || f),
|
syncTracker(),
|
||||||
syncRelays().then(f => console.log("syncRelays", Date.now() - t) || f),
|
syncRelays(),
|
||||||
syncHandles().then(f => console.log("syncHandles", Date.now() - t) || f),
|
syncHandles(),
|
||||||
syncZappers().then(f => console.log("syncZappers", Date.now() - t) || f),
|
syncZappers(),
|
||||||
syncFreshness().then(f => console.log("syncFreshness", Date.now() - t) || f),
|
syncFreshness(),
|
||||||
syncPlaintext().then(f => console.log("syncPlaintext", Date.now() - t) || f),
|
syncPlaintext(),
|
||||||
syncWrapManager().then(f => console.log("syncWrapManager", Date.now() - t) || f),
|
syncWrapManager(),
|
||||||
])
|
])
|
||||||
|
|
||||||
return () => unsubscribers.forEach(call)
|
return () => unsubscribers.forEach(call)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import {hash, range, reject, flatten, identity, groupBy} from "@welshman/lib"
|
import {reject, identity} from "@welshman/lib"
|
||||||
import {type StorageProvider} from "@welshman/store"
|
import {type StorageProvider} from "@welshman/store"
|
||||||
import {Preferences} from "@capacitor/preferences"
|
import {Preferences} from "@capacitor/preferences"
|
||||||
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
||||||
@@ -37,8 +37,6 @@ export type CollectionOptions<T> = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export class Collection<T> {
|
export class Collection<T> {
|
||||||
#shardCount = 1000
|
|
||||||
|
|
||||||
constructor(readonly options: CollectionOptions<T>) {}
|
constructor(readonly options: CollectionOptions<T>) {}
|
||||||
|
|
||||||
static clearAll = async (): Promise<void> => {
|
static clearAll = async (): Promise<void> => {
|
||||||
@@ -57,18 +55,12 @@ export class Collection<T> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
getShardIds = () => Array.from(range(0, this.#shardCount))
|
#path = () => `collection_${this.options.table}.json`
|
||||||
|
|
||||||
getShardId = (id: string) => String(hash(id) % this.#shardCount)
|
get = async (): Promise<T[]> => {
|
||||||
|
|
||||||
getShardIdFromItem = (item: T) => this.getShardId(this.options.getId(item))
|
|
||||||
|
|
||||||
#path = (shard: string) => `collection_${this.options.table}_${shard}.json`
|
|
||||||
|
|
||||||
getShard = async (shard: string): Promise<T[]> => {
|
|
||||||
try {
|
try {
|
||||||
const file = await Filesystem.readFile({
|
const file = await Filesystem.readFile({
|
||||||
path: this.#path(shard),
|
path: this.#path(),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
})
|
})
|
||||||
@@ -81,48 +73,36 @@ export class Collection<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
get = async (): Promise<T[]> => flatten(await Promise.all(this.getShardIds().map(id => this.getShard(id))))
|
set = (items: T[]) =>
|
||||||
|
|
||||||
setShard = async (shard: string, items: T[]) =>
|
|
||||||
Filesystem.writeFile({
|
Filesystem.writeFile({
|
||||||
path: this.#path(shard),
|
path: this.#path(),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
data: items.map(v => JSON.stringify(v)).join("\n"),
|
data: items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
})
|
})
|
||||||
|
|
||||||
set = (items: T[]) =>
|
add = (items: T[]) =>
|
||||||
Promise.all(
|
|
||||||
Array.from(groupBy(this.getShardIdFromItem, items)).map(([shard, chunk]) =>
|
|
||||||
this.setShard(shard, chunk),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
addToShard = (shard: string, items: T[]) =>
|
|
||||||
Filesystem.appendFile({
|
Filesystem.appendFile({
|
||||||
path: this.#path(shard),
|
path: this.#path(),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
data: "\n" + items.map(v => JSON.stringify(v)).join("\n"),
|
data: "\n" + items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
})
|
})
|
||||||
|
|
||||||
add = (items: T[]) =>
|
remove = async (ids: Set<string>) =>
|
||||||
Promise.all(
|
this.set(reject(item => ids.has(this.options.getId(item)), await this.get()))
|
||||||
Array.from(groupBy(this.getShardIdFromItem, items)).map(([shard, chunk]) =>
|
|
||||||
this.addToShard(shard, chunk),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
removeFromShard = async (shard: string, ids: Set<string>) =>
|
update = async ({add, remove}: {add?: T[]; remove?: Set<string>}) => {
|
||||||
this.setShard(
|
if (remove && remove.size > 0) {
|
||||||
shard,
|
const items = reject(item => remove.has(this.options.getId(item)), await this.get())
|
||||||
reject(item => ids.has(this.options.getId(item)), await this.getShard(shard)),
|
|
||||||
)
|
|
||||||
|
|
||||||
remove = (ids: Iterable<string>) =>
|
if (add) {
|
||||||
Promise.all(
|
items.push(...add)
|
||||||
Array.from(groupBy(this.getShardId, ids)).map(([shard, chunk]) =>
|
}
|
||||||
this.removeFromShard(shard, new Set(chunk)),
|
|
||||||
),
|
await this.set(items)
|
||||||
)
|
} else if (add && add.length > 0) {
|
||||||
|
await this.add(add)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
import "@capacitor-community/safe-area"
|
import "@capacitor-community/safe-area"
|
||||||
import {throttle} from "throttle-debounce"
|
import {throttle} from "throttle-debounce"
|
||||||
import * as nip19 from "nostr-tools/nip19"
|
import * as nip19 from "nostr-tools/nip19"
|
||||||
|
import type {Unsubscriber} from "svelte/store"
|
||||||
import {get} from "svelte/store"
|
import {get} from "svelte/store"
|
||||||
import {App, type URLOpenListenerEvent} from "@capacitor/app"
|
import {App, type URLOpenListenerEvent} from "@capacitor/app"
|
||||||
import {dev} from "$app/environment"
|
import {dev} from "$app/environment"
|
||||||
@@ -47,6 +48,8 @@
|
|||||||
|
|
||||||
const {children} = $props()
|
const {children} = $props()
|
||||||
|
|
||||||
|
const policies = [authPolicy, trustPolicy, mostlyRestrictedPolicy]
|
||||||
|
|
||||||
// Add stuff to window for convenience
|
// Add stuff to window for convenience
|
||||||
Object.assign(window, {
|
Object.assign(window, {
|
||||||
get,
|
get,
|
||||||
@@ -91,63 +94,9 @@
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Listen to navigation changes
|
const unsubscribe = call(async () => {
|
||||||
const unsubscribeHistory = setupHistory()
|
const unsubscribers: Unsubscriber[] = []
|
||||||
|
|
||||||
// Report usage on navigation change
|
|
||||||
const unsubscribeAnalytics = setupAnalytics()
|
|
||||||
|
|
||||||
// Bug tracking
|
|
||||||
const unsubscribeTracking = setupTracking()
|
|
||||||
|
|
||||||
// Load user data, listen for messages, etc
|
|
||||||
const unsubscribeApplicationData = syncApplicationData()
|
|
||||||
|
|
||||||
// Whenever we see a new pubkey, load their outbox event
|
|
||||||
const unsubscribeRepository = on(repository, "update", ({added}) => {
|
|
||||||
for (const event of added) {
|
|
||||||
loadRelaySelections(event.pubkey)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Subscribe to badge count for changes
|
|
||||||
const unsubscribeBadgeCount = notifications.badgeCount.subscribe(
|
|
||||||
notifications.handleBadgeCountChanges,
|
|
||||||
)
|
|
||||||
|
|
||||||
// Listen for signer errors, report to user via toast
|
|
||||||
const unsubscribeSignerLog = signerLog.subscribe(
|
|
||||||
throttle(10_000, $log => {
|
|
||||||
const recent = $log.slice(-10)
|
|
||||||
const success = recent.filter(spec({status: SignerLogEntryStatus.Success}))
|
|
||||||
const failure = recent.filter(spec({status: SignerLogEntryStatus.Failure}))
|
|
||||||
|
|
||||||
if (!$toast && failure.length > 5 && success.length === 0) {
|
|
||||||
pushToast({
|
|
||||||
theme: "error",
|
|
||||||
timeout: 60_000,
|
|
||||||
message: "Your signer appears to be unresponsive.",
|
|
||||||
action: {
|
|
||||||
message: "Details",
|
|
||||||
onclick: () => goto("/settings/profile"),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Sync theme
|
|
||||||
const unsubscribeTheme = theme.subscribe($theme => {
|
|
||||||
document.body.setAttribute("data-theme", $theme)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Sync font size
|
|
||||||
const unsubscribeSettings = userSettingsValues.subscribe($userSettingsValues => {
|
|
||||||
// @ts-ignore
|
|
||||||
document.documentElement.style["font-size"] = `${$userSettingsValues.font_size}rem`
|
|
||||||
})
|
|
||||||
|
|
||||||
const unsubscribeStorage = call(async () => {
|
|
||||||
// Sync stuff to localstorage
|
// Sync stuff to localstorage
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
sync({
|
sync({
|
||||||
@@ -167,29 +116,71 @@
|
|||||||
}),
|
}),
|
||||||
])
|
])
|
||||||
|
|
||||||
// Sync stuff to indexeddb
|
// Wait until data storage is initialized before syncing other stuff
|
||||||
return await storage.syncDataStores()
|
unsubscribers.push(await storage.syncDataStores())
|
||||||
|
|
||||||
|
// Add our extra policies now that we're set up
|
||||||
|
defaultSocketPolicies.push(...policies)
|
||||||
|
|
||||||
|
// Remove policies when we're done
|
||||||
|
unsubscribers.push(() => defaultSocketPolicies.splice(-policies.length))
|
||||||
|
|
||||||
|
// History, navigation, bug tracking, application data
|
||||||
|
unsubscribers.push(setupHistory(), setupAnalytics(), setupTracking(), syncApplicationData())
|
||||||
|
|
||||||
|
// Whenever we see a new pubkey, load their outbox event
|
||||||
|
unsubscribers.push(
|
||||||
|
on(repository, "update", ({added}) => {
|
||||||
|
for (const event of added) {
|
||||||
|
loadRelaySelections(event.pubkey)
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subscribe to badge count for changes
|
||||||
|
unsubscribers.push(notifications.badgeCount.subscribe(notifications.handleBadgeCountChanges))
|
||||||
|
|
||||||
|
// Listen for signer errors, report to user via toast
|
||||||
|
unsubscribers.push(
|
||||||
|
signerLog.subscribe(
|
||||||
|
throttle(10_000, $log => {
|
||||||
|
const recent = $log.slice(-10)
|
||||||
|
const success = recent.filter(spec({status: SignerLogEntryStatus.Success}))
|
||||||
|
const failure = recent.filter(spec({status: SignerLogEntryStatus.Failure}))
|
||||||
|
|
||||||
|
if (!$toast && failure.length > 5 && success.length === 0) {
|
||||||
|
pushToast({
|
||||||
|
theme: "error",
|
||||||
|
timeout: 60_000,
|
||||||
|
message: "Your signer appears to be unresponsive.",
|
||||||
|
action: {
|
||||||
|
message: "Details",
|
||||||
|
onclick: () => goto("/settings/profile"),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sync theme and font size
|
||||||
|
unsubscribers.push(
|
||||||
|
theme.subscribe($theme => {
|
||||||
|
document.body.setAttribute("data-theme", $theme)
|
||||||
|
}),
|
||||||
|
userSettingsValues.subscribe($userSettingsValues => {
|
||||||
|
// @ts-ignore
|
||||||
|
document.documentElement.style["font-size"] = `${$userSettingsValues.font_size}rem`
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
return () => unsubscribers.forEach(call)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Default socket policies
|
|
||||||
const additionalPolicies = [authPolicy, trustPolicy, mostlyRestrictedPolicy]
|
|
||||||
|
|
||||||
defaultSocketPolicies.push(...additionalPolicies)
|
|
||||||
|
|
||||||
// Cleanup on hot reload
|
// Cleanup on hot reload
|
||||||
import.meta.hot?.dispose(() => {
|
import.meta.hot?.dispose(() => {
|
||||||
App.removeAllListeners()
|
App.removeAllListeners()
|
||||||
unsubscribeHistory()
|
unsubscribe.then(call)
|
||||||
unsubscribeAnalytics()
|
|
||||||
unsubscribeTracking()
|
|
||||||
unsubscribeApplicationData()
|
|
||||||
unsubscribeRepository()
|
|
||||||
unsubscribeBadgeCount()
|
|
||||||
unsubscribeSignerLog()
|
|
||||||
unsubscribeTheme()
|
|
||||||
unsubscribeSettings()
|
|
||||||
unsubscribeStorage.then(call)
|
|
||||||
defaultSocketPolicies.splice(-additionalPolicies.length)
|
|
||||||
})
|
})
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
@@ -199,7 +190,7 @@
|
|||||||
{/if}
|
{/if}
|
||||||
</svelte:head>
|
</svelte:head>
|
||||||
|
|
||||||
{#await unsubscribeStorage}
|
{#await unsubscribe}
|
||||||
<!-- pass -->
|
<!-- pass -->
|
||||||
{:then}
|
{:then}
|
||||||
<div>
|
<div>
|
||||||
|
|||||||
Reference in New Issue
Block a user