欢迎光临
我们一直在努力

Go 中的 Apache Beam ParDo 过滤器

go 中的 apache beam pardo 过滤器

问题内容

我是一名 python 开发人员,但应该使用 go 制作数据流管道。
与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。

我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。

func filterage(list user) user {
    return list.age > 40    
}

在 python 中,我可以编写如下所示的函数。

beam.Filter(lambda line: line["Age"] >= 40))

正确答案

您需要在函数中添加一个发射器来发射用户:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

正如您当前代码中所写, 返回 list.age > 40list.age > 40 首先评估为 true(布尔值),并且返回该布尔值。

赞(0) 打赏
未经允许不得转载:码农资源网 » Go 中的 Apache Beam ParDo 过滤器
分享到

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

登录

找回密码

注册