Move wip
This commit is contained in:
@@ -309,75 +309,6 @@ const putEmbedding = async (
|
||||
ensureOk(res);
|
||||
};
|
||||
|
||||
interface PullEmbeddingsResult {
|
||||
/**
|
||||
* Derived data indexed by the file id whose data this is.
|
||||
*/
|
||||
items: Map<number, RemoteDerivedData>;
|
||||
/**
|
||||
* The latest {@link updatedAt} epoch milliseconds from all the derived data
|
||||
* in {@link items}.
|
||||
*/
|
||||
latestUpdatedAt: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch derived data that has been created or updated since the given
|
||||
* {@link sinceTime}.
|
||||
*
|
||||
* This allows a client to perform a quick "diff" and get embeddings that has
|
||||
* changed since the last time it checked. By fetching these all upfront instead
|
||||
* of doing them one by one during the indexing, we can speed up the initial
|
||||
* sync of existing embeddings on a new client.
|
||||
*
|
||||
* @param sinceTime Epoch milliseconds. We use this to ask remote to provide us
|
||||
* derived data whose {@link updatedAt} is more than the given value. If not
|
||||
* specified, then we'll start from the beginning.
|
||||
*
|
||||
* @param limit An advisory limit on the number of items to return.
|
||||
*
|
||||
* @returns a map of {@link RemoteDerivedData} indexed by the id of the file
|
||||
* whose derived data it is, and the latest {@link updatedAt} from amongst all
|
||||
* the data that was fetched.
|
||||
*
|
||||
* The caller should persist the returned timestamp for use in subsequent calls
|
||||
* to {@link pullEmbeddings} to resume pulling from the current checkpoint.
|
||||
*
|
||||
* Returns undefined if nothing more is left to pull.
|
||||
*/
|
||||
const pullEmbeddings = async (
|
||||
model: EmbeddingModel,
|
||||
sinceTime: number | undefined,
|
||||
limit: number,
|
||||
): Promise<PullEmbeddingsResult | undefined> => {
|
||||
// If since time is not provided, start at 0 (the beginning).
|
||||
let latestUpdatedAt = sinceTime ?? 0;
|
||||
|
||||
// See if anything changed since then.
|
||||
const indexedFiles = await getIndexedFiles(
|
||||
"derived",
|
||||
latestUpdatedAt,
|
||||
limit,
|
||||
);
|
||||
|
||||
// Nope. Nothing more is left to do.
|
||||
if (!indexedFiles.length) return undefined;
|
||||
|
||||
// Find the latest from amongst the given updatedAt. This'll serve as our
|
||||
// checkpoint for the next pull.
|
||||
latestUpdatedAt = indexedFiles.reduce(
|
||||
(max, { updatedAt }) => Math.max(max, updatedAt),
|
||||
latestUpdatedAt,
|
||||
);
|
||||
|
||||
// Fetch the embeddings for these guys. In rare cases, remote might return a
|
||||
// partial response, but that will not have any lasting impact since we
|
||||
// anyways refetch the derived data before attempting indexing.
|
||||
const items = await fetchDerivedData()
|
||||
|
||||
// getIndexedFiles(model)
|
||||
};
|
||||
|
||||
/** A single entry in the response of {@link getIndexedFiles}. */
|
||||
const IndexedFile = z.object({
|
||||
fileID: z.number(),
|
||||
@@ -387,21 +318,32 @@ const IndexedFile = z.object({
|
||||
type IndexedFile = z.infer<typeof IndexedFile>;
|
||||
|
||||
/**
|
||||
* Fetch the file ids for {@link model} embeddings that have been created or
|
||||
* Fetch the file ids whose {@link model} derived data has been created or
|
||||
* updated since the given {@link sinceTime}.
|
||||
*
|
||||
* See {@link pullEmbeddings} for details about the parameters.
|
||||
* This allows a client to perform a quick "diff" and first fetch all derived
|
||||
* data that has changed since the last time it checked. By fetching these all
|
||||
* upfront instead of doing them one by one during the indexing, we can speed up
|
||||
* the initial sync of existing embeddings on a new client.
|
||||
*
|
||||
* @param sinceTime Epoch milliseconds. We use this to ask remote to provide us
|
||||
* derived data whose {@link updatedAt} is more than the given value. If not
|
||||
* specified, then we'll start from the beginning.
|
||||
*
|
||||
* @param limit An advisory limit on the number of items to return.
|
||||
*
|
||||
* @returns an array of file ids, each with an associated timestamp when the
|
||||
* embedding for that file was last changed.
|
||||
* derived data for that file was last changed.
|
||||
*
|
||||
* The caller should persist the latest amongst these timestamps and use it in
|
||||
* subsequent calls to resume pulling from the current checkpoint.
|
||||
*/
|
||||
const getIndexedFiles = async (
|
||||
model: EmbeddingModel,
|
||||
export const getIndexedDerivedDataFiles = async (
|
||||
sinceTime: number,
|
||||
limit: number,
|
||||
): Promise<IndexedFile[]> => {
|
||||
const params = new URLSearchParams({
|
||||
model,
|
||||
model: "derived",
|
||||
sinceTime: sinceTime.toString(),
|
||||
limit: limit.toString(),
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { clientPackageName } from "@/base/app";
|
||||
import { isHTTP4xxError } from "@/base/http";
|
||||
import { getKVN } from "@/base/kv";
|
||||
import { getKVN, setKV } from "@/base/kv";
|
||||
import { ensureAuthToken } from "@/base/local-user";
|
||||
import log from "@/base/log";
|
||||
import type { ElectronMLWorker } from "@/base/types/ipc";
|
||||
@@ -34,6 +34,7 @@ import {
|
||||
} from "./db";
|
||||
import {
|
||||
fetchDerivedData,
|
||||
getIndexedDerivedDataFiles,
|
||||
putDerivedData,
|
||||
type RawRemoteDerivedData,
|
||||
type RemoteDerivedData,
|
||||
@@ -311,8 +312,43 @@ expose(MLWorker);
|
||||
*
|
||||
* Return true atleast one embedding was pulled.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
const pull = async () => {
|
||||
// If we've never pulled before, start at the beginning (0).
|
||||
return pullSince((await latestDerivedDataUpdatedAt()) ?? 0);
|
||||
};
|
||||
|
||||
const latestDerivedDataUpdatedAt = () => getKVN("latestDerivedDataUpdatedAt");
|
||||
|
||||
const setLatestDerivedDataUpdatedAt = (n: number) =>
|
||||
setKV("latestDerivedDataUpdatedAt", n);
|
||||
|
||||
const pullSince = async (sinceTime: number) => {
|
||||
// See if anything has changed since `sinceTime`.
|
||||
const indexedFiles = await getIndexedDerivedDataFiles(sinceTime, 200);
|
||||
|
||||
// Nope. Nothing more is left to do.
|
||||
if (!indexedFiles.length) return undefined;
|
||||
|
||||
// Find the latest from amongst all the updatedAt we got back. This'll serve
|
||||
// as our checkpoint for the next pull.
|
||||
const latestUpdatedAt = indexedFiles.reduce(
|
||||
(max, { updatedAt }) => Math.max(max, updatedAt),
|
||||
sinceTime,
|
||||
);
|
||||
|
||||
// Fetch the embeddings for the files which changed.
|
||||
//
|
||||
// In rare cases, remote might return a partial response, but that will not
|
||||
// have any lasting impact since we anyways refetch the derived data before
|
||||
// attempting indexing.
|
||||
|
||||
|
||||
|
||||
|
||||
const items = await fetchDerivedData();
|
||||
|
||||
// getIndexedFiles(model)
|
||||
|
||||
return "";
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user