gomog/internal/engine/concurrency_test.go

334 lines
7.7 KiB
Go

package engine
import (
"fmt"
"sync"
"testing"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// TestConcurrentAccess_Aggregation 测试聚合引擎并发访问安全性
func TestConcurrentAccess_Aggregation(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 准备测试数据
CreateTestCollectionForTesting(store, "concurrent_test", generateDocuments(100))
pipeline := []types.AggregateStage{
{Stage: "$match", Spec: map[string]interface{}{"status.active": true}},
{Stage: "$limit", Spec: float64(10)},
}
var wg sync.WaitGroup
errors := make(chan error, 10)
// 启动 10 个 goroutine 并发执行聚合
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
_, err := engine.Execute("concurrent_test", pipeline)
if err != nil {
errors <- err
}
}(i)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
t.Errorf("Concurrent execution failed with %d errors", len(errors))
for err := range errors {
t.Error(err)
}
}
}
// TestRaceCondition_MemoryStore 测试 MemoryStore 的竞态条件
func TestRaceCondition_MemoryStore(t *testing.T) {
store := NewMemoryStore(nil)
// 创建集合
CreateTestCollectionForTesting(store, "race_test", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
})
var wg sync.WaitGroup
errors := make(chan error, 20)
// 并发读取和写入
for i := 0; i < 10; i++ {
wg.Add(2)
// 读操作
go func(id int) {
defer wg.Done()
_, err := store.GetAllDocuments("race_test")
if err != nil {
errors <- err
}
}(i)
// 写操作
go func(id int) {
defer wg.Done()
doc := types.Document{
ID: fmt.Sprintf("newdoc%d", id),
Data: map[string]interface{}{"value": float64(id)},
}
err := store.InsertDocument("race_test", doc)
if err != nil {
errors <- err
}
}(i)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
t.Errorf("Race condition detected with %d errors", len(errors))
}
}
// TestConcurrent_UnionWith 测试 $unionWith 的并发安全性
func TestConcurrent_UnionWith(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建多个集合
CreateTestCollectionForTesting(store, "union_main", generateDocuments(50))
CreateTestCollectionForTesting(store, "union_other1", generateDocuments(50))
CreateTestCollectionForTesting(store, "union_other2", generateDocuments(50))
pipeline := []types.AggregateStage{
{Stage: "$unionWith", Spec: "union_other1"},
{Stage: "$unionWith", Spec: "union_other2"},
}
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := engine.Execute("union_main", pipeline)
if err != nil {
t.Error(err)
}
}()
}
wg.Wait()
}
// TestConcurrent_Redact 测试 $redact 的并发安全性
func TestConcurrent_Redact(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
docs := make(map[string]types.Document)
for i := 0; i < 100; i++ {
docs[fmt.Sprintf("doc%d", i)] = types.Document{
ID: fmt.Sprintf("doc%d", i),
Data: map[string]interface{}{
"level": float64(i % 10),
"secret": "classified",
"public": "visible",
},
}
}
CreateTestCollectionForTesting(store, "redact_test", docs)
spec := map[string]interface{}{
"$cond": map[string]interface{}{
"if": map[string]interface{}{
"$gte": []interface{}{"$level", float64(5)},
},
"then": "$$KEEP",
"else": "$$PRUNE",
},
}
pipeline := []types.AggregateStage{
{Stage: "$redact", Spec: spec},
}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := engine.Execute("redact_test", pipeline)
if err != nil {
t.Error(err)
}
}()
}
wg.Wait()
}
// TestConcurrent_OutMerge 测试 $out/$merge 的并发写入
func TestConcurrent_OutMerge(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 源集合
CreateTestCollectionForTesting(store, "source_concurrent", generateDocuments(20))
var wg sync.WaitGroup
targetCollections := []string{"target1", "target2", "target3"}
// 并发执行 $out 到不同集合
for i, target := range targetCollections {
wg.Add(1)
go func(idx int, coll string) {
defer wg.Done()
pipeline := []types.AggregateStage{
{Stage: "$out", Spec: coll},
}
_, err := engine.Execute("source_concurrent", pipeline)
if err != nil {
t.Error(err)
}
}(i, target)
}
wg.Wait()
// 验证所有目标集合都已创建
for _, coll := range targetCollections {
docs, err := store.GetAllDocuments(coll)
if err != nil {
t.Errorf("Target collection %s not found", coll)
}
if len(docs) != 20 {
t.Errorf("Expected 20 docs in %s, got %d", coll, len(docs))
}
}
}
// TestStress_LargeDataset 压力测试:大数据集
func TestStress_LargeDataset(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 生成 10000 个文档
largeDocs := make(map[string]types.Document)
for i := 0; i < 10000; i++ {
largeDocs[fmt.Sprintf("doc%d", i)] = types.Document{
ID: fmt.Sprintf("doc%d", i),
Data: map[string]interface{}{
"index": float64(i),
"category": fmt.Sprintf("cat%d", i%100),
"value": float64(i) * 1.5,
"tags": []interface{}{"tag1", "tag2", "tag3"},
"metadata": map[string]interface{}{"created": "2024-01-01"},
},
}
}
CreateTestCollectionForTesting(store, "stress_large", largeDocs)
pipeline := []types.AggregateStage{
{Stage: "$match", Spec: map[string]interface{}{
"index": map[string]interface{}{"$lt": float64(5000)},
}},
{Stage: "$group", Spec: map[string]interface{}{
"_id": "$category",
"total": map[string]interface{}{"$sum": "$value"},
}},
{Stage: "$sort", Spec: map[string]interface{}{"total": -1}},
{Stage: "$limit", Spec: float64(10)},
}
// 执行 5 次,验证稳定性
for i := 0; i < 5; i++ {
results, err := engine.Execute("stress_large", pipeline)
if err != nil {
t.Fatalf("Iteration %d failed: %v", i, err)
}
if len(results) > 100 { // 应该有最多 100 个类别
t.Errorf("Unexpected result count: %d", len(results))
}
}
}
// TestConcurrent_TypeConversion 测试类型转换的并发安全性
func TestConcurrent_TypeConversion(t *testing.T) {
engine := &AggregationEngine{}
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(4)
go func(id int) {
defer wg.Done()
data := map[string]interface{}{"value": float64(id)}
_ = engine.toString("$value", data)
}(i)
go func(id int) {
defer wg.Done()
data := map[string]interface{}{"value": float64(id)}
_ = engine.toInt("$value", data)
}(i)
go func(id int) {
defer wg.Done()
data := map[string]interface{}{"value": float64(id)}
_ = engine.toDouble("$value", data)
}(i)
go func(id int) {
defer wg.Done()
data := map[string]interface{}{"value": float64(id)}
_ = engine.toBool("$value", data)
}(i)
}
wg.Wait()
}
// TestConcurrent_Bitwise 测试位运算的并发安全性
func TestConcurrent_Bitwise(t *testing.T) {
engine := &AggregationEngine{}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(4)
go func(id int) {
defer wg.Done()
operand := []interface{}{float64(id), float64(id * 2)}
data := map[string]interface{}{}
_ = engine.bitAnd(operand, data)
}(i)
go func(id int) {
defer wg.Done()
operand := []interface{}{float64(id), float64(id * 2)}
data := map[string]interface{}{}
_ = engine.bitOr(operand, data)
}(i)
go func(id int) {
defer wg.Done()
operand := []interface{}{float64(id), float64(id * 2)}
data := map[string]interface{}{}
_ = engine.bitXor(operand, data)
}(i)
go func(id int) {
defer wg.Done()
operand := float64(id)
data := map[string]interface{}{}
_ = engine.bitNot(operand, data)
}(i)
}
wg.Wait()
}