package engine import ( "fmt" "sync" "testing" "git.kingecg.top/kingecg/gomog/pkg/types" ) // TestConcurrentAccess_Aggregation 测试聚合引擎并发访问安全性 func TestConcurrentAccess_Aggregation(t *testing.T) { store := NewMemoryStore(nil) engine := NewAggregationEngine(store) // 准备测试数据 CreateTestCollectionForTesting(store, "concurrent_test", generateDocuments(100)) pipeline := []types.AggregateStage{ {Stage: "$match", Spec: map[string]interface{}{"status.active": true}}, {Stage: "$limit", Spec: float64(10)}, } var wg sync.WaitGroup errors := make(chan error, 10) // 启动 10 个 goroutine 并发执行聚合 for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() _, err := engine.Execute("concurrent_test", pipeline) if err != nil { errors <- err } }(i) } wg.Wait() close(errors) if len(errors) > 0 { t.Errorf("Concurrent execution failed with %d errors", len(errors)) for err := range errors { t.Error(err) } } } // TestRaceCondition_MemoryStore 测试 MemoryStore 的竞态条件 func TestRaceCondition_MemoryStore(t *testing.T) { store := NewMemoryStore(nil) // 创建集合 CreateTestCollectionForTesting(store, "race_test", map[string]types.Document{ "doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}}, }) var wg sync.WaitGroup errors := make(chan error, 20) // 并发读取和写入 for i := 0; i < 10; i++ { wg.Add(2) // 读操作 go func(id int) { defer wg.Done() _, err := store.GetAllDocuments("race_test") if err != nil { errors <- err } }(i) // 写操作 go func(id int) { defer wg.Done() doc := types.Document{ ID: fmt.Sprintf("newdoc%d", id), Data: map[string]interface{}{"value": float64(id)}, } err := store.InsertDocument("race_test", doc) if err != nil { errors <- err } }(i) } wg.Wait() close(errors) if len(errors) > 0 { t.Errorf("Race condition detected with %d errors", len(errors)) } } // TestConcurrent_UnionWith 测试 $unionWith 的并发安全性 func TestConcurrent_UnionWith(t *testing.T) { store := NewMemoryStore(nil) engine := NewAggregationEngine(store) // 创建多个集合 CreateTestCollectionForTesting(store, "union_main", generateDocuments(50)) CreateTestCollectionForTesting(store, "union_other1", generateDocuments(50)) CreateTestCollectionForTesting(store, "union_other2", generateDocuments(50)) pipeline := []types.AggregateStage{ {Stage: "$unionWith", Spec: "union_other1"}, {Stage: "$unionWith", Spec: "union_other2"}, } var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() _, err := engine.Execute("union_main", pipeline) if err != nil { t.Error(err) } }() } wg.Wait() } // TestConcurrent_Redact 测试 $redact 的并发安全性 func TestConcurrent_Redact(t *testing.T) { store := NewMemoryStore(nil) engine := NewAggregationEngine(store) docs := make(map[string]types.Document) for i := 0; i < 100; i++ { docs[fmt.Sprintf("doc%d", i)] = types.Document{ ID: fmt.Sprintf("doc%d", i), Data: map[string]interface{}{ "level": float64(i % 10), "secret": "classified", "public": "visible", }, } } CreateTestCollectionForTesting(store, "redact_test", docs) spec := map[string]interface{}{ "$cond": map[string]interface{}{ "if": map[string]interface{}{ "$gte": []interface{}{"$level", float64(5)}, }, "then": "$$KEEP", "else": "$$PRUNE", }, } pipeline := []types.AggregateStage{ {Stage: "$redact", Spec: spec}, } var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() _, err := engine.Execute("redact_test", pipeline) if err != nil { t.Error(err) } }() } wg.Wait() } // TestConcurrent_OutMerge 测试 $out/$merge 的并发写入 func TestConcurrent_OutMerge(t *testing.T) { store := NewMemoryStore(nil) engine := NewAggregationEngine(store) // 源集合 CreateTestCollectionForTesting(store, "source_concurrent", generateDocuments(20)) var wg sync.WaitGroup targetCollections := []string{"target1", "target2", "target3"} // 并发执行 $out 到不同集合 for i, target := range targetCollections { wg.Add(1) go func(idx int, coll string) { defer wg.Done() pipeline := []types.AggregateStage{ {Stage: "$out", Spec: coll}, } _, err := engine.Execute("source_concurrent", pipeline) if err != nil { t.Error(err) } }(i, target) } wg.Wait() // 验证所有目标集合都已创建 for _, coll := range targetCollections { docs, err := store.GetAllDocuments(coll) if err != nil { t.Errorf("Target collection %s not found", coll) } if len(docs) != 20 { t.Errorf("Expected 20 docs in %s, got %d", coll, len(docs)) } } } // TestStress_LargeDataset 压力测试:大数据集 func TestStress_LargeDataset(t *testing.T) { store := NewMemoryStore(nil) engine := NewAggregationEngine(store) // 生成 10000 个文档 largeDocs := make(map[string]types.Document) for i := 0; i < 10000; i++ { largeDocs[fmt.Sprintf("doc%d", i)] = types.Document{ ID: fmt.Sprintf("doc%d", i), Data: map[string]interface{}{ "index": float64(i), "category": fmt.Sprintf("cat%d", i%100), "value": float64(i) * 1.5, "tags": []interface{}{"tag1", "tag2", "tag3"}, "metadata": map[string]interface{}{"created": "2024-01-01"}, }, } } CreateTestCollectionForTesting(store, "stress_large", largeDocs) pipeline := []types.AggregateStage{ {Stage: "$match", Spec: map[string]interface{}{ "index": map[string]interface{}{"$lt": float64(5000)}, }}, {Stage: "$group", Spec: map[string]interface{}{ "_id": "$category", "total": map[string]interface{}{"$sum": "$value"}, }}, {Stage: "$sort", Spec: map[string]interface{}{"total": -1}}, {Stage: "$limit", Spec: float64(10)}, } // 执行 5 次,验证稳定性 for i := 0; i < 5; i++ { results, err := engine.Execute("stress_large", pipeline) if err != nil { t.Fatalf("Iteration %d failed: %v", i, err) } if len(results) > 100 { // 应该有最多 100 个类别 t.Errorf("Unexpected result count: %d", len(results)) } } } // TestConcurrent_TypeConversion 测试类型转换的并发安全性 func TestConcurrent_TypeConversion(t *testing.T) { engine := &AggregationEngine{} var wg sync.WaitGroup for i := 0; i < 20; i++ { wg.Add(4) go func(id int) { defer wg.Done() data := map[string]interface{}{"value": float64(id)} _ = engine.toString("$value", data) }(i) go func(id int) { defer wg.Done() data := map[string]interface{}{"value": float64(id)} _ = engine.toInt("$value", data) }(i) go func(id int) { defer wg.Done() data := map[string]interface{}{"value": float64(id)} _ = engine.toDouble("$value", data) }(i) go func(id int) { defer wg.Done() data := map[string]interface{}{"value": float64(id)} _ = engine.toBool("$value", data) }(i) } wg.Wait() } // TestConcurrent_Bitwise 测试位运算的并发安全性 func TestConcurrent_Bitwise(t *testing.T) { engine := &AggregationEngine{} var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(4) go func(id int) { defer wg.Done() operand := []interface{}{float64(id), float64(id * 2)} data := map[string]interface{}{} _ = engine.bitAnd(operand, data) }(i) go func(id int) { defer wg.Done() operand := []interface{}{float64(id), float64(id * 2)} data := map[string]interface{}{} _ = engine.bitOr(operand, data) }(i) go func(id int) { defer wg.Done() operand := []interface{}{float64(id), float64(id * 2)} data := map[string]interface{}{} _ = engine.bitXor(operand, data) }(i) go func(id int) { defer wg.Done() operand := float64(id) data := map[string]interface{}{} _ = engine.bitNot(operand, data) }(i) } wg.Wait() }