【已解决】使用logstash导入doris只能导入csv格式,其他格式不支持

Viewed 134

使用logstash收集flink日志数据导入doris中,遇到困难,format参数不能识别,能否提供一个正常的手机日志的配置样例,日志样例为:

2024-05-08 18:38:40,169 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2024-05-08 18:38:40,176 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils [] - Trying to select the network interface and address to use by connecting to the leading JobManager.
2024-05-08 18:38:40,176 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils [] - TaskManager will try to connect for PT10S before falling back to heuristics
2024-05-08 18:38:40,383 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - TaskManager will use hostname/address '172.17.0.1' (172.17.0.1) for communication.
2024-05-08 18:38:40,417 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Trying to start actor system, external address 172.17.0.1:0, bind address 0.0.0.0:0.
2024-05-08 18:38:40,868 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started
2024-05-08 18:38:40,890 INFO akka.remote.RemoteActorRefProvider [] - Akka Cluster not in use - enabling unsafe features anyway because akka.remote.use-unsafe-remote-features-outside-cluster has been enabled.
2024-05-08 18:38:40,890 INFO akka.remote.Remoting [] - Starting remoting
2024-05-08 18:38:41,008 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@172.17.0.1:41843]
2024-05-08 18:38:41,101 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Actor system started at akka.tcp://flink@172.17.0.1:41843
2024-05-08 18:38:41,118 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - No metrics reporter configured, no metrics will be exposed/reported.
2024-05-08 18:38:41,121 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Trying to start actor system, external address 172.17.0.1:0, bind address 0.0.0.0:0.
2024-05-08 18:38:41,134 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started
2024-05-08 18:38:41,137 INFO akka.remote.RemoteActorRefProvider [] - Akka Cluster not in use - enabling unsafe features anyway because akka.remote.use-unsafe-remote-features-outside-cluster has been enabled.
2024-05-08 18:38:41,137 INFO akka.remote.Remoting [] - Starting remoting
2024-05-08 18:38:41,143 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink-metrics@172.17.0.1:38265]
2024-05-08 18:38:41,151 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Actor system started at akka.tcp://flink-metrics@172.17.0.1:38265
2024-05-08 18:38:41,160 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService_172.17.0.1:41843-2e93d4 .
2024-05-08 18:38:41,169 INFO org.apache.flink.runtime.blob.PermanentBlobCache [] - Created BLOB cache storage directory /app/tmn/middleware/flink/blob/blobStore-3c889fdc-bb1a-4aac-b0e6-b4401ac3c054
2024-05-08 18:38:41,171 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Created BLOB cache storage directory /app/tmn/middleware/flink/blob/blobStore-7497532e-8e77-44e9-8b1f-f1626e85b3a7
2024-05-08 18:38:41,173 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: []
2024-05-08 18:38:41,173 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Starting TaskManager with ResourceID: 172.17.0.1:41843-2e93d4
2024-05-08 18:38:41,189 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices [] - Temporary file directory '/app/tmn/middleware/flink/tmp': total 999 GB, usable 938 GB (93.89% usable)
2024-05-08 18:38:41,191 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Created a new FileChannelManager for spilling of task related data to disk (joins, sorting, ...). Used directories:
/app/tmn/middleware/flink/tmp/flink-io-7ff3d39f-63ec-4d4a-bf88-21ffea33193c

2 Answers

参考这个:

input {
    file {
        path => "/path/to/your/log"
    }
}

output {
    doris {
        http_hosts => [ "http://fehost1:http_port", "http://fehost2:http_port", "http://fehost3:http_port"]
        user => "your_username"
        password => "your_password"
        db => "your_db"
        table => "your_table"
        # doris stream load http headers
        headers => {
          "format" => "json"
          "read_json_by_line" => "true"
          "load_to_single_tablet" => "true"
        }
        # field mapping: doris fileld name => logstash field name
        # %{} to get a logstash field, [] for nested field such as [host][name] for host.name
        mapping => {
          "ts" => "%{@timestamp}"
          "host" => "%{[host][name]}"
          "path" => "%{[log][file][path]}"
          "message" => "%{message}"
        }
        log_request => true
        log_speed_interval => 10
    }
}

文档这块后续会更新下

谢谢,之前问题已解决,主要原因是部分日志数据中有\n,从而导致导入数据失败。同时也非常感谢你发的代码!