Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Retry Toggle #91

Open
JuchangGit opened this issue Aug 10, 2024 · 6 comments
Open

Auto Retry Toggle #91

JuchangGit opened this issue Aug 10, 2024 · 6 comments

Comments

@JuchangGit
Copy link

JuchangGit commented Aug 10, 2024

能否添加自动重试的开关或者控制方式,因为bento目前的工作方式是输入和输出错误发生错误时会一直重试下去,这对于有些场景是不适合的。
比如:输入为数据库时,上游如果改变了表结构,那么bento将不断重试,上游可能认为这是恶意攻击

能否添加控制重试的次数或是否自动重试的开关来控制bento的默认行为

@jem-davies
Copy link
Collaborator

Translation: (from google but also checked by a native speaker)

Can you add a switch or control method for automatic retry? Because the current working method of Bento is that it will always retry when an input or output error occurs, which is not suitable for some scenarios.

For example: when the input is a database, if the upstream changes the table structure, Bento will continue to retry, and the upstream may think this is a malicious attack.

Can I add a switch that controls the number of retries or whether to automatically retry to control the default behavior of bento?

@jem-davies jem-davies self-assigned this Aug 10, 2024
@jem-davies
Copy link
Collaborator

jem-davies commented Aug 10, 2024

你可以使用 streams mode 来解决这个问题。这种方法可以使用一个/ready endpoint 来报错没有被连接的 streams(bento configs)。然后你可以对他们执行一个CRUD操作来删除那些报错的stream。

这里我有一个 stream (config.yaml) 可以通过endpoint来检查并且删除那些连接失败的stream。

It is possible to use streams mode and what this allows you to do is poll a /ready endpoint that will return streams (bento configs) that aren't connected. You can then perform CRUD operations on the streams so you could then delete the offending streams.

Here I have a stream (config.yaml) that polls the endpoint and will delete the streams if they are reporting not ready:

sql_test
├── resources.yaml
└── streams
    ├── config.yaml
    ├── sql.yaml
    └── sql_2.yaml

./resources.yaml

rate_limit_resources:
  - label: ready_rate_limit
    local:
      count: 1
      interval: 2s

./streams/config.yaml

input:
  http_client:
    url: "http://0.0.0.0:4195/ready"
    verb: GET
    timeout: 5s
    successful_on: [503]
    rate_limit: ready_rate_limit

pipeline:
  processors:
      - switch: 
          - check: content().string() == "OK"
            processors:
              - mapping: |
                  root = deleted()

      - mapping: |
          root = {"response": content().string()}

      - mapping: |
          root.streams = this.response.replace("streams ", "").replace(" are not connected\n", "").split(", ")

      - branch:
          processors:
              - http:
                  url: "http://0.0.0.0:4195/streams/${! streams.0 }"
                  verb: DELETE
                  timeout: 5s
      
      - mapping: |
          root = {"deleted_stream":streams.0}

output:
  stdout: {}

./streams/sql.yaml

input:
  sql_select:
    driver: postgres
    dsn: postgres://admin:[email protected]:5432/test_bento?sslmode=disable
    table: people_copy
    columns: ['first_name', 'last_name', 'age']

output:
  stdout: {}

./streams/sql_2.yaml (same as sql.yaml)

bento:

bento -r resources.yaml streams ./streams/*.yaml 

Output:

INFO Running without a main config file            @service=bento bento_version=""
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=bento
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=sql_2
INFO Input type http_client is now active          @service=bento label="" path=root.input stream=config
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=sql
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=config
INFO Launching Bento in streams mode, use CTRL+C to close  @service=bento
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
{"deleted_stream":"sql"}
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
{"deleted_stream":"sql_2"}

@JuchangGit
Copy link
Author

这确实是一种可行的方案,但我要的不是这种。
我的用法是让bento作为执行器,调度程序调用bento来处理数据,调度器只需调起和检测bento的退出状态来确定接下来的任务怎么执行。
像下面这样:
a→b→c
a,b,c都是bento的实例,a失败则b,c都不执行。
我要的是bento只要出错,不论是input、pipeline或output,bento的退出状态码都是非0 。

不知道我表达的是否清楚

@jem-davies
Copy link
Collaborator

#92

Added this ticket to consider adding the option to allow such a thing, however pretty sure that the original project that this was forked from never intended to enable such a thing.

That it is by design that bento shouldn't every stop trying to connect to an input.

@JuchangGit
Copy link
Author

JuchangGit commented Aug 22, 2024

是否可以为input和output提供一个配置项——最大重试次数 max_retry_num ,默认值为 -1 表示一直重试(和现在的机制一样), 让用户可以控制重试的次数。配置像下面这样:

input:
  max_retry_num: 2
  stdin:
    scanner:
      lines: {}
    auto_replay_nacks: true
buffer:
  none: {}
pipeline:
  threads: -1
  processors: []
output:
  max_retry_num: 3
  stdout:
    codec: lines

@JuchangGit
Copy link
Author

#91 (comment) 可能有个bug,当把./streams/sql_2.yaml里面的数据库连接串写错,ip或者端口port写错, /ready 的返回值是 ok

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants