Please enable Javascript to view the contents

Logstash 配置基础

 ·  ☕ 4 分钟

1. Logstash 的基本原理

Logstash 是一个用于数据传输和处理的组件。

通过插件的组合,Logstash 可以满足各种日志采集的场景:

logstash->elasticsearch
filebeat->logstash->elasticsearch
filebeat->kafka->logstash->elasticsearch
filebeat->redis->logstash->elasticsearch

2. Logstash 的基本配置

下面是一个 Logstash 的配置格式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 数据源,例如 Kafka、MySQL
input {
  
}
# 过滤器,用于处理数据,可选,例如丢掉空值、添加标签等
filter {

}
# 输出,例如 ES、文件
output {

}

input 数据源包括:

azure_event_hubs、beats、cloudwatch、couchdb_changes、dead_letter_queue、elastic_agent、elasticsearch、exec、file、ganglia、gelf、generator、github、google_cloud_storage、google_pubsub、graphite、heartbeat、http、http_poller、imap、irc、java_generator、java_stdin、jdbc、jms、jmx、kafka、kinesis、log4j、lumberjack、meetup、pipe、puppet_facter、rabbitmq、redis、relp、rss、s3、s3-sns-sqs、salesforce、snmp、snmptrap、sqlite、sqs、stdin、stomp、syslog、tcp、twitter、udp、unix、varnishlog、websocket、wmi、xmpp。

参考:ES 输入插件

filter 过滤插件包括:

age、aggregate、alter、bytes、cidr、cipher、clone、csv、date、de_dot、dissect、dns、drop、elapsed、elasticsearch、environment、extractnumbers、fingerprint、geoip、grok、http、i18n、java_uuid、jdbc_static、jdbc_streaming、json、json_encode、kv、memcached、metricize、metrics、mutate、prune、range、ruby、sleep、split、syslog_pri、threats_classifier、throttle、tld、translate、truncate、urldecode、useragent、uuid、wurfl_device_detection、xml

参考:ES 过滤插件

output 输出源包括:

boundary、circonus、cloudwatch、csv、datadog、datadog_metrics、dynatrace、elastic_app_search、elastic_workplace_search、elasticsearch、email、exec、file、ganglia、gelf、google_bigquery、google_cloud_storage、google_pubsub、graphite、graphtastic、http、influxdb、irc、java_stdout、juggernaut、kafka、librato、loggly、lumberjack、metriccatcher、mongodb、nagios、nagios_nsca、opentsdb、pagerduty、pipe、rabbitmq、redis、redmine、riak、riemann、s3、sink、sns、solr_http、sqs、statsd、stdout、stomp、syslog、tcp、timber、udp、webhdfs、websocket、xmpp、zabbix

参考:ES 输出插件

在配置时,需要根据使用场景,结合官方文档进行调试。

3. Logstash 的配置示例

3.1 标准出入到标准输出

1
2
3
4
5
6
input {
    stdin {}
}
output {
    stdout { codec => rubydebug }
}

3.2 文件到 ES

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
input {
    file {
        path => [ "/data/nginx/logs/nginx_access.log" ]
        start_position => "beginning"
        ignore_older => 0
    }
}
filter {
    # 使用 grok 插件对日志内容进行格式化
    grok {
        match => {
            "message" => "%{COMBINEDAPACHELOG}"
        }
    }
}
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "nginx-access"
    }
}

3.3 Filebeats 到 ES

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
input { 
    beats { 
        port => 8088 
    }
}
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "beats"
    }
}

3.4 Kafka 到 ES

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
input {
  kafka {
    bootstrap_servers => "${KAFKA_URL:localhost:2187}"
    topics => "${KAFKA_TOPIC:kafka-topic-test}"
    group_id => "${KAFKA_GROUP_ID:logstash-es}"
    consumer_threads => "${KAFKA_CONSUMER_THREADS:6}"
  }
}
filter {
  if ![message] {
    drop { }
  }
  mutate {
    remove_field => [ "headers", "result"]
  }
}
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "kafka"
    }
}

微信公众号
作者
微信公众号