fix: Add functions to the Kv API in order to query ranges - getLessOrEqual forgot about data migration
This commit is contained in:
70
src/cache/impl/LevelDbCache.ts
vendored
70
src/cache/impl/LevelDbCache.ts
vendored
@@ -4,7 +4,7 @@ import { MemoryLevel } from 'memory-level';
|
||||
import { CacheOptions } from '../../core/WarpFactory';
|
||||
import { LoggerFactory } from '../../logging/LoggerFactory';
|
||||
import { SortKeyCacheRangeOptions } from '../SortKeyCacheRangeOptions';
|
||||
import { AbstractSublevelOptions } from 'abstract-level/types/abstract-sublevel';
|
||||
import { AbstractSublevel, AbstractSublevelOptions } from 'abstract-level/types/abstract-sublevel';
|
||||
import { AbstractChainedBatch } from 'abstract-level/types/abstract-chained-batch';
|
||||
import { AbstractKeyIteratorOptions } from 'abstract-level/types/abstract-iterator';
|
||||
|
||||
@@ -79,23 +79,11 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(cacheKey.key, this.subLevelOptions);
|
||||
// manually opening to fix https://github.com/Level/level/issues/221
|
||||
await contractCache.open();
|
||||
try {
|
||||
const result: ClientValueWrapper<V> = await contractCache.get(cacheKey.sortKey);
|
||||
let resultValue: V;
|
||||
if (result.tomb === undefined && result.value === undefined) {
|
||||
resultValue = result as V;
|
||||
} else {
|
||||
resultValue = result.tomb ? null : result.value;
|
||||
const subLevelValue = await this.getValueFromLevel(cacheKey.sortKey, contractCache);
|
||||
if (subLevelValue) {
|
||||
return new SortKeyCacheResult<V>(cacheKey.sortKey, subLevelValue);
|
||||
}
|
||||
return new SortKeyCacheResult<V>(cacheKey.sortKey, resultValue);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (e: any) {
|
||||
if (e.code == 'LEVEL_NOT_FOUND') {
|
||||
return null;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async getLast(key: string): Promise<SortKeyCacheResult<V> | null> {
|
||||
@@ -104,12 +92,9 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
await contractCache.open();
|
||||
const keys = await contractCache.keys({ reverse: true, limit: 1 }).all();
|
||||
if (keys.length) {
|
||||
const lastValueWrap = await contractCache.get(keys[0]);
|
||||
if (lastValueWrap.tomb === undefined && lastValueWrap.value === undefined) {
|
||||
return new SortKeyCacheResult<V>(keys[0], lastValueWrap as V);
|
||||
}
|
||||
if (!lastValueWrap.tomb) {
|
||||
return new SortKeyCacheResult<V>(keys[0], lastValueWrap.value);
|
||||
const subLevelValue = await this.getValueFromLevel(keys[0], contractCache);
|
||||
if (subLevelValue) {
|
||||
return new SortKeyCacheResult<V>(keys[0], subLevelValue);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
@@ -121,9 +106,35 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
await contractCache.open();
|
||||
const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all();
|
||||
if (keys.length) {
|
||||
const cachedVal = await contractCache.get(keys[0]);
|
||||
if (!cachedVal.tomb) {
|
||||
return new SortKeyCacheResult<V>(keys[0], cachedVal.value);
|
||||
const subLevelValue = await this.getValueFromLevel(keys[0], contractCache);
|
||||
if (subLevelValue) {
|
||||
return new SortKeyCacheResult<V>(keys[0], subLevelValue);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private async getValueFromLevel(
|
||||
key: string,
|
||||
level: AbstractSublevel<
|
||||
MemoryLevel<string, ClientValueWrapper<V>>,
|
||||
string | Buffer | Uint8Array,
|
||||
string,
|
||||
ClientValueWrapper<V>
|
||||
>
|
||||
): Promise<V | null> {
|
||||
try {
|
||||
const wrappedValue = await level.get(key);
|
||||
if (wrappedValue && wrappedValue.tomb === undefined && wrappedValue.value === undefined) {
|
||||
return wrappedValue as V;
|
||||
}
|
||||
if (wrappedValue && wrappedValue.tomb === false && wrappedValue.value != null) {
|
||||
return wrappedValue.value;
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (e: any) {
|
||||
if (e.code != 'LEVEL_NOT_FOUND') {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
@@ -215,8 +226,12 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
|
||||
try {
|
||||
transactionMarkValue = await this.db.get(this.ongoingTransactionMark);
|
||||
// eslint-disable-next-line no-empty
|
||||
} catch (error) {}
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (e: any) {
|
||||
if (e.code != 'LEVEL_NOT_FOUND') {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
if (transactionMarkValue == 'ongoing') {
|
||||
throw new Error(`Database seems to be in inconsistent state. The previous transaction has not finished.`);
|
||||
@@ -281,6 +296,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
async kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<Map<string, V>> {
|
||||
const result: Map<string, V> = new Map();
|
||||
const allKeys = (await this.db.keys(this.levelRangeOptions(options)).all())
|
||||
.filter((k) => k != this.ongoingTransactionMark)
|
||||
.filter((k) => !sortKey || this.extractSortKey(k).localeCompare(sortKey) <= 0)
|
||||
.map((k) => this.extractOriginalKey(k));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user