Rill:Go语言中并发+事务的批处理开源项目


Rill(名词:小流)是一个用于流式传输、并行处理和管道构建的综合 Go 工具包。它旨在减少样板文件并简化使用,使开发人员能够专注于核心逻辑,而不会因并发的复杂性而陷入困境。

通过通道转换、类型安全、批处理和错误处理实现并发。

设计理念
rill 的核心在于一个简单而强大的概念:在由Try结构封装的包装值通道上进行操作。

  • 此类通道可以手动创建,
  • 也可以通过FromSlice或FromChan等实用程序创建,
  • 然后通过Map、Filter、FlatMap等操作进行转换。
  • 最后,当所有处理阶段完成后,可以通过ForEach、ToSlice或通过迭代结果通道来手动消耗数据 。

背景上下文
需要执行任何特殊操作即可使用 Go 并发性。大多数人编写 HTTP 服务,每个请求都已经在单独的 goroutine 中处理。您只需要确保您使用的所有其他库都是线程安全的,并且在大多数情况下,它们都是线程安全的。

当需要实现批处理以减少数据库负载时,高级并发问题就发生了:

  • 批量 DB 插入:多个执行程序处理 DB 插入。将记录收集到一个通道,然后处理该通道,分批插入。
  • 批处理数据库更新:与插入类似,但用于更新,如 UPDATE users SET last_active_at=NOW() WHERE id IN(?,?,?,?..)
  • 批量处理队列信息:收集一批消息,提取 ID,并执行一次数据库查询 WHERE id IN (...) ,然后将消息标记为已处理。

还有更多情况,不仅与 DB 有关。我很快意识到,我不想一遍又一遍地重复同样的代码,所以我做了一个泛型批处理函数。那时候 Go 还没有泛型,所以我的泛型函数实际上是基于反射的。

  • 编写基本的并行循环让我了解了 WaitGroups,
  • 而与错误处理相关的一些 bug 和 goroutine 泄露则让我了解了 ErrGroup。

有一次,我需要一个可以容纳多达 N 个唯一键的映射。尝试插入 N+1 个键时会阻塞,直到另一个键被删除。为了实现这个目标,我学习了 sync.Condition。

我还需要下载许多巨大的 CSV 文件,每个文件都包含特定日期的交易列表。之后,我需要解析和比较连续几天的 CSV。为了加快速度,我需要同时下载它们。这就是我的 Rill 模块中 Ordered* 函数的诞生过程。

同步包并没有错,在某些情况下,同步/原子是解决问题的最佳方法。

Rill主要特征

  • 轻量级:快速且模块化,可以轻松集成到现有项目中
  • 易于使用:管理 goroutine、等待组和错误处理的复杂性被抽象化了
  • Concurrent:控制所有操作的并发级别
  • 批处理:提供了一种简单的方式来批量组织和处理数据
  • 错误处理:提供一种结构化的方法来处理并发应用程序中的错误
  • 流:以最小的内存占用处理实时数据流或大型数据集
  • 顺序保留:提供保留数据原始顺序的功能,同时仍然允许并发处理
  • 高效的资源使用:goroutines 的数量和分配不取决于数据大小
  • 通用:所有操作都是类型安全的,可以与任何数据类型一起使用
  • 函数式编程:基于函数式编程概念,使map、filter、flatMap等操作可用于基于通道的工作流程

用法示例
考虑一个从多个 URL 获取键、从键值数据库批量检索其值并打印它们的应用程序。此示例展示了该库在处理并发任务、错误传播、批处理和数据流方面的优势,同时保持简单性和效率。


func main() {
    urls := rill.FromSlice([]string{
        "https://example.com/file1.txt",
       
"https://example.com/file2.txt",
       
"https://example.com/file3.txt",
       
"https://example.com/file4.txt",
    }, nil)

   
// 从每个 URL 获取键值,并将其扁平化为单一流
    keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] {
        return streamFileLines(url)
    })

   
// 从数据流中排除任何空键
    keys = rill.Filter(keys, 3, func(key string) (bool, error) {
        return key !=
"", nil
    })

   
//将密钥key整理成易于管理的批次,每批 10 个,以便批量操作
    keyBatches := rill.Batch(keys, 10, 1*time.Second)

   
// 从数据库中获取每批键的值
    resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) {
        values, err := kvMultiGet(keys...)
        if err != nil {
            return nil, err
        }

        results := make([]KV, len(keys))
        for i, key := range keys {
            results[i] = KV{Key: key, Value: values[i]}
        }

        return results, nil
    })

   
//将批次转换回单个项目进行最终处理
    results := rill.Unbatch(resultBatches)

   
//从数据流中排除任何空值
    results = rill.Filter(results, 3, func(kv KV) (bool, error) {
        return kv.Value !=
"<nil>", nil
    })

   
//遍历每个键值对并打印
    cnt := 0
    err := rill.ForEach(results, 1, func(kv KV) error {
        fmt.Println(kv.Key,
"=>", kv.Value)
        cnt++
        return nil
    })
    if err != nil {
        fmt.Println(
"Error:", err)
    }

    fmt.Println(
"Total keys:", cnt)
}

// streamFileLines 可以从 URL 逐行流式传输文件、
func streamFileLines(url string) <-chan rill.Try[string] {
   
// ...
}

// kvMultiGet 从键值数据库中进行批量读取、
func kvMultiGet(keys ...string) ([]string, error) {
   
// ...
}


批处理
批处理是并发处理中的常见模式,尤其是在处理外部服务或数据库时。 Rill 提供了Batch函数,可将项目流组织成指定大小的批次。还可以指定一个超时,之后即使批次未满,也会发出该批次。当输入流缓慢或稀疏时,这对于保持应用程序的反应非常有用。

扇入和扇出
提供了扇入和扇出数据流的机制。扇入是通过合并功能完成的,该功能将多个数据流合并到一个统一的通道中。扇出是通过Split2函数完成的,该函数将单个输入流划分为两个不同的输出通道。这种划分基于鉴别器函数,允许基于数据特征的并行处理路径。

错误处理
错误是使用ForEach处理的,这对于大多数用例都有好处。 ForEach在出现第一个错误时停止处理并返回该错误。如果您需要在管道中间处理错误,和/或在发生错误后继续处理,可以使用Catch函数。

终止和资源泄漏
在 Go 并发应用程序中,如果一个通道没有读者,那么写者就会被卡住,从而导致潜在的程序和内存泄漏。

这个问题也会延伸到基于 Go 通道构建的 rill 管道;如果管道中的任何阶段缺少消费者,上游的整个生产者链都可能被阻塞。

因此,确保管道被完全消费至关重要,尤其是在错误导致提前终止的情况下。

下面的示例演示了一种情况,即最后处理阶段在第一次遇到错误时退出,从而导致管道处于阻塞状态。

func doWork(ctx context.Context) error {
    // 初始化管道的第一阶段
    ids := streamIDs(ctx)
    
    
// Define other pipeline stages...
    
    
// 最后阶段处理
    for value := range results {
        
// Process value...
        if someCondition {
            return fmt.Errorf(
"some error") // Early exit on error
        }
    }
    return nil
}

为防止出现此类问题,建议确保在出现错误时排空结果通道。一种直接的方法是使用 defer 来调用 DrainNB:

func doWork(ctx context.Context) error {
    // Initialize the first stage of the pipeline
    ids := streamIDs(ctx)
    
   
// Define other pipeline stages...
    
   
// 确保在发生故障时排空管道
    defer rill.DrainNB(results)
    
   
// Final stage processing
    for value := range results {
       
// Process value...
        if someCondition {
            return fmt.Errorf(
"some error") // Early exit on error
        }
    }
    return nil
}

利用 ForEach 或 ToSlice 等内置排水机制的函数,可以简化代码并提高可读性:

func doWork(ctx context.Context) error {
    // Initialize the first stage of the pipeline
    ids := streamIDs(ctx)
    
   
// Define other pipeline stages...

   
// Final stage processing
    return rill.ForEach(results, 5, func(value string) error {
       
// Process value...
        if someCondition {
            return fmt.Errorf(
"some error") // 出错时提前退出,自动排水
        }
        return nil
    })
}

虽然这些措施能有效防止泄漏,但只要初始阶段产生数值,管道就可能继续在后台消耗数值。

最佳做法是使用上下文管理第一阶段(以及可能的其他阶段),从而实现受控关闭:

func doWork(ctx context.Context) error {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel() // 确保功能退出时取消第一阶段

   
// Initialize the first stage of the pipeline
    ids := streamIDs(ctx)

   
// Define other pipeline stages...

   
// Final stage processing
    return rill.ForEach(results, 5, func(value string) error {
       
// Process value
        if someCondition {
            return fmt.Errorf(
"some error") // Early exit on error, with automatic draining
        }
        return nil
    })
}

顺序保持
在并发环境中,由于并行执行的特性,保持已处理项目的原始顺序具有挑战性。当数值从输入通道读取、通过函数 f 处理并写入输出通道时,它们的顺序可能与输入顺序不一致。为了解决这个问题,rill 为其核心函数(如 OrderedMap、OrderedFilter 等)提供了有序版本。这些函数确保,如果输入通道中的值 x 在值 y 之前,那么输出中的 f(x) 就会在 f(y) 之前,从而保持原来的顺序。值得注意的是,与无序函数相比,这些有序函数会产生少量开销,这是因为需要额外的逻辑来保持顺序。

在数据顺序会影响结果的情况下,保持顺序至关重要。例如,一个应用程序需要检索特定时间段内的每日温度测量值,并计算一天到第二天的温度变化。虽然并行获取数据可以提高效率,但按照原始顺序处理数据对于准确计算温度变化至关重要。

type Measurement struct {
    Date time.Time
    Temp float64
}

func main() {
    city := "New York"
    endDate := time.Now()
    startDate := endDate.AddDate(0, 0, -30)

   
// 创建一个通道,发送开始日期和结束日期之间的所有天数
    days := make(chan rill.Try[time.Time])
    go func() {
        defer close(days)
        for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
            days <- rill.Wrap(date, nil)
        }
    }()

   
// 同时下载每天的温度
    measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
        temp, err := getTemperature(city, date)
        return Measurement{Date: date, Temp: temp}, err
    })

   
// 迭代测量结果,计算并打印变化。使用单个 goroutine
    prev := Measurement{Temp: math.NaN()}
    err := rill.ForEach(measurements, 1, func(m Measurement) error {
        change := m.Temp - prev.Temp
        prev = m

        fmt.Printf(
"%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change)
        return nil
    })
    if err != nil {
        fmt.Println(
"Error:", err)
    }
}

// getTemperature 获取城市和日期的温度读数
func getTemperature(city string, date time.Time) (float64, error) {
   
// ...
}