diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 81cc059bfa..6cf0d610fc 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -103,7 +103,7 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmb log.Error(uploadErr) return nil, stacktrace.Propagate(uploadErr, "") } - embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version) + embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version, c.S3Config.GetDerivedStorageDataCenter()) embedding.Version = &version if err != nil { return nil, stacktrace.Propagate(err, "") diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index 86915fde51..bcbe822e4c 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -18,15 +18,26 @@ type Repository struct { } // Create inserts a new embedding - -func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int) (ente.Embedding, error) { +func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int, dc string) (ente.Embedding, error) { var updatedAt int64 - err := r.DB.QueryRowContext(ctx, `INSERT INTO embeddings - (file_id, owner_id, model, size, version) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model - DO UPDATE SET updated_at = now_utc_micro_seconds(), size = $4, version = $5 - RETURNING updated_at`, entry.FileID, ownerID, entry.Model, size, version).Scan(&updatedAt) + err := r.DB.QueryRowContext(ctx, ` + INSERT INTO embeddings + (file_id, owner_id, model, size, version, datacenters) + VALUES + ($1, $2, $3, $4, $5, ARRAY[$6]::s3region[]) + ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model + DO UPDATE + SET + updated_at = now_utc_micro_seconds(), + size = $4, + version = $5, + datacenters = CASE + WHEN $6 = ANY(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[])) THEN embeddings.datacenters + ELSE array_append(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[]), $6::s3region) + END + RETURNING updated_at`, + entry.FileID, ownerID, entry.Model, size, version, dc).Scan(&updatedAt) + if err != nil { // check if error is due to model enum invalid value if err.Error() == fmt.Sprintf("pq: invalid input value for enum model: \"%s\"", entry.Model) {