filebeat消费kafka 然后推入doris为啥会报这个错?

Viewed 92

Failed to serialize the event: key not found,但是实际上数据又是写进去了的
image.png

5 Answers

dissect 很难处理这种不统一的格式,可以用 filebeat grok 插件,可以像 logstash 一样做复杂的处理

辛苦提供下filebeat配置和日志数据样例

主配置文件:
max_procs: 1
filebeat.config.inputs:
enabled: true
path: /home/admin/filebeat/config/njbank/*.yml
reload.enabled: true
reload.period: 10s
processors:

  • script:
    lang: javascript
    source: >
    function process(event) {
    var msg = event.Get("message");
    var obj = JSON.parse(msg);
    var server = obj.server_name;
    var content = obj.message;
    var tenantId = obj.tenant_id;
    var envName = obj.env_name;
    event.Put("message", msg);
    content = content.replace(/\t/g, " ");
    content = content.replace(/\0/g, " ");
    content = content.replace(/\/g, '\\');
    content = content.replace(/"/g, '\"');
    content = content.replace(/\n/g, ",");
    event.Put("content",content);
    event.Put("server_name",server);
    event.Put("tenant_id",tenantId);
    event.Put("env_name",envName);
    }
  • dissect:
    field: "content"
    tokenizer: "%{day} %{time} %{log_level} [%{thread}] [%{tid}] [%{classname}] %{hengxian} %{log|string}"
    target_prefix: ""
    ignore_failure: true
    overwrite_keys: true
    queue.mem:
    events: 50000
    flush.min_events: 50000
    flush.timeout: 10s

output.doris:
fenodes: ["http://11.113.208.48:8030","11.113.208.137:8030","11.113.208.104:8030"]
user: "root"
password: "doris@123"
database: "biz_log"
table: "real_log_table_njbank"
codec_format_string: '{"time": "%{[day]} %{[time]}","message": "%{[log]}","log_level": "%{[log_level]}", "class_name": "%{[classname]}", "system_name": "%{[server_name]}", "thread_name": "%{[thread]}","tenant_id": "%{[tenant_id]}","env_name": "%{[env_name]}"}'
headers:
format: "json"
read_json_by_line: "true"
load_to_single_tablet: "true"
column_separator: \x05
max_filter_ratio: "0.2"

kafka配置文件:下面这个是其中一个topic的input的配置文件,我是扫描文件夹下全部的配置文件:

  • clean_removed: false
    close_eof: false
    close_timeout: 5m
    enabled: true
  • type: kafka
    hosts:
    • 11.113.224.40:9092
    • 11.113.224.20:9092
      group_id: "ton4"
      oldest: "oldest"
      topics: ["qifujt_njbank_yushu-res_stg1"]

kafka样例数据:
{"@timestamp":"2024-08-15T10:12:39.904Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.5.3"},"tenant_id":"njbank","_type":"monitor","app_name":"argus-apv","message":"2024-08-15 18:12:39.005|INFO |argus-apv|822ffbb21bdf4cbcaa563d79e4626ab4#0|NA|c.q.f.c.s.ChronusSdkFacade#tongshujun11|SUCCESS|4ms||172.26.5.252||","cluster":"default","server_name":"argus-apv","env_name":"stg1","log_source":"k8s"}