gomog/internal/engine/memory_store.go

207 lines
4.3 KiB
Go

package engine
import (
"context"
"sync"
"time"
"git.kingecg.top/kingecg/gomog/internal/database"
"git.kingecg.top/kingecg/gomog/pkg/errors"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// MemoryStore 内存数据存储
type MemoryStore struct {
mu sync.RWMutex
collections map[string]*Collection
adapter database.DatabaseAdapter
}
// Collection 内存集合
type Collection struct {
name string
documents map[string]types.Document // id -> Document
mu sync.RWMutex
}
// NewMemoryStore 创建内存存储
func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore {
return &MemoryStore{
collections: make(map[string]*Collection),
adapter: adapter,
}
}
// LoadCollection 从数据库加载集合到内存
func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error {
// 检查集合是否存在
exists, err := ms.adapter.CollectionExists(ctx, name)
if err != nil {
return err
}
if !exists {
// 创建集合
if err := ms.adapter.CreateCollection(ctx, name); err != nil {
return err
}
}
// 从数据库加载所有文档
docs, err := ms.adapter.FindAll(ctx, name)
if err != nil {
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
coll := &Collection{
name: name,
documents: make(map[string]types.Document),
}
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
ms.collections[name] = coll
return nil
}
// GetCollection 获取集合
func (ms *MemoryStore) GetCollection(name string) (*Collection, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
coll, exists := ms.collections[name]
if !exists {
return nil, errors.ErrCollectionNotFnd
}
return coll, nil
}
// Insert 插入文档到内存
func (ms *MemoryStore) Insert(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// Find 查询文档
func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
var results []types.Document
for _, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
results = append(results, doc)
}
}
return results, nil
}
// Update 更新文档
func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update) (int, int, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, 0, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
matched := 0
modified := 0
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
matched++
// 应用更新
newData := applyUpdate(doc.Data, update)
coll.documents[id] = types.Document{
ID: doc.ID,
Data: newData,
CreatedAt: doc.CreatedAt,
UpdatedAt: time.Now(),
}
modified++
}
}
return matched, modified, nil
}
// Delete 删除文档
func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
deleted := 0
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
delete(coll.documents, id)
deleted++
}
}
return deleted, nil
}
// SyncToDB 同步集合到数据库
func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
// 转换为文档数组
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
// 批量插入/更新到数据库
// 注意:这里简化处理,实际应该区分新增和更新
return ms.adapter.InsertMany(ctx, collection, docs)
}
// GetAllDocuments 获取集合的所有文档(用于聚合)
func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
return docs, nil
}