<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • ElasticSearch7.3學習(三十二)----logstash三大插件(input、filter、output)及其綜合示例

    1、 Logstash輸入插件

    1.1 input介紹

    logstash支持很多數據源,比如說file,http,jdbc,s3等等

    圖片上面只是一少部分。詳情見網址:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

    1.2 標準輸入(Stdin)

    這種控制臺輸入前面已經介紹過了,這里就不解析了。

    鏈接:ElasticSearch7.3學習(三十一)----Logstash基礎學習

    input{
        stdin{
           
        }
    }
    output {
        stdout{
            codec=>rubydebug    
        }
    }

    1.3 讀取文件(File)

    比如說我存在一個nginx1.log文件,文件內容如下:

    注意:文件光標要指向下一行,不然最后一行可能讀取不到

    我想把文件內容打印至控制臺顯示。可在config/test1.conf里面添加如下內容,可采用通配符讀取多個文件

    input {
        file {
            path => ["E:/ElasticSearch/logstash-7.3.0/nginx*.log"]        
            start_position => "beginning"
        }
    }
    output {
        stdout {
        	codec=>rubydebug 
        }
    }

    具體的運行方式參照:ElasticSearch7.3學習(三十一)----Logstash基礎學習

    結果如下:

    1.4 實時更新文件

    假如說我們往nginx1.log下新增加一條數據,看下效果

    在生產環境下,服務一直在運行,日志文件一直在增加,logstash會自動讀取新增的數據

    默認情況下,logstash會從文件的結束位置開始讀取數據,也就是說logstash進程會以類似tail -f命令的形式逐行獲取數據。

    logstash使用一個名為filewatch的ruby gem庫來監聽文件變化,并通過一個叫.sincedb的數據庫文件來記錄被監聽的日志文件的讀取進度(時間戳),這個sincedb數據文件的默認路徑在 <path.data>/plugins/inputs/file下面,文件名類似于.sincedb_123456,而<path.data>表示logstash插件存儲目錄,默認是LOGSTASH_HOME/data。

    1.5 讀取TCP網絡數據

    下面的內部表示監聽端口的數據打印在控制臺,用的比較少,這里就不演示了。

    input {
      tcp {
        port => "1234"
      }
    }
    ?
    filter {
      grok {
        match => { "message" => "%{SYSLOGLINE}" }
      }
    }
    ?
    output {
        stdout{
            codec=>rubydebug
        }
    }

    2、Logstash過濾器插件(Filter)

    2.1 Filter介紹

    Logstash 可以幫利用它自己的Filter幫我們對數據進行解析,豐富,轉換等

    詳情請見網址:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

    下面簡單的介紹幾個常用的。

    2.2 Grok 正則捕獲

    grok是一個十分強大的logstash filter插件,他可以通過正則解析任意文本,將非結構化日志數據弄成結構化和方便查詢的結構。他是目前logstash 中解析非結構化日志數據最好的方式。

    Grok 的語法規則是:

    %{語法: 語義}

    例如輸入的內容為:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

    下面是一個組合匹配模式,它可以獲取上面輸入的所有內容:

    %{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
    • %{IP:clientip}匹配模式將獲得的結果為:clientip: 172.16.213.132
    • %{HTTPDATE:timestamp}匹配模式將獲得的結果為:timestamp: 07/Feb/2018:16:24:19 +0800
    • %{QS:referrer}匹配模式將獲得的結果為:referrer: "GET / HTTP/1.1"
    • %{NUMBER:response}匹配模式將獲得的結果為:NUMBER: "403"
    • %{NUMBER:bytes}匹配模式將獲得的結果為:NUMBER: "5039"

    通過上面這個組合匹配模式,我們將輸入的內容分成了五個部分,即五個字段,將輸入內容分割為不同的數據字段,這對于日后解析和查詢日志數據非常有用,這正是使用grok的目的。

    舉個例子:可在config/test1.conf里面添加如下內容,用法同上

    input{
        stdin{}
    }
    filter{
        grok{
            match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
        }
    }
    output{
        stdout{
            codec => "rubydebug"
        }
    }

    輸入內容:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

    結果如下:

    可以看到將一個長的字符串拆分為好幾個字段,這樣做的一個好處在于可以分割字符串,這樣的話可直接輸出至ElasticSearch。

    2.3 時間處理(Date)

    date插件是對于排序事件和回填舊數據尤其重要,它可以用來轉換日志記錄中的時間字段,變成LogStash::Timestamp對象,然后轉存到@timestamp字段里,這在之前已經做過簡單的介紹。 下面是date插件的一個配置示例:

    可在config/test1.conf里面添加如下內容,用法同上

    input{
        stdin{}
    }
    filter {
        grok {
            match => ["message", "%{HTTPDATE:timestamp}"]
        }
        date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    }
    output{
        stdout{
            codec => "rubydebug"
        }
    }

    輸入內容:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

    結果如下:

    可以看到將時間戳格式轉換為比較容易理解的格式。

    2.4 數據修改(Mutate)

    下面幾個用法就不單獨演示了,后面會有一個綜合示例演示所有的用法。

    (1)正則表達式替換匹配字段

    gsub可以通過正則表達式替換字段中匹配到的值,只對字符串字段有效,下面是一個關于mutate插件中gsub的示例(僅列出filter部分):

    filter {
        mutate {
            gsub => ["filed_name_1", "/" , "_"]
        }
    }

    這個示例表示將filed_name_1字段中所有"/"字符替換為"_"。

    (2)分隔符分割字符串為數組

    split可以通過指定的分隔符分割字段中的字符串為數組,下面是一個關于mutate插件中split的示例(僅列出filter部分):

    filter {
        mutate {
            split => ["filed_name_2", "|"]
        }
    }

    這個示例表示將filed_name_2字段以"|"為區間分隔為數組。

    (3)重命名字段

    rename可以實現重命名某個字段的功能,下面是一個關于mutate插件中rename的示例(僅列出filter部分):

    filter {
        mutate {
            rename => { "old_field" => "new_field" }
        }
    }

    這個示例表示將字段old_field重命名為new_field。

    (4)刪除字段

    remove_field可以實現刪除某個字段的功能,下面是一個關于mutate插件中remove_field的示例(僅列出filter部分):

    filter {
        mutate {
            remove_field  =>  ["timestamp"]
        }
    }

    這個示例表示將字段timestamp刪除。

    (5)GeoIP 地址查詢歸類

    將ip轉為地理信息

    filter {
        geoip {
            source => "ip_field"
        }
    }

    2.5 綜合示例

    下面給出一個綜合示例,將上面介紹到的用法集成到一個filter中使用。

    首先轉換成多個字段 --> 去除message字段 --> 日期格式轉換 --> 字段轉換類型 --> 字段重命名 --> replace替換字段 --> split按分割符拆分數據成為數組

    可在config/test1.conf里面添加如下內容,用法同上

    input {
        stdin {}
    }
    filter {
        grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
    date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    mutate {
              convert => [ "response","float" ]
               rename => { "response" => "response_new" }   
               gsub => ["referrer","\"",""]          
               split => ["clientip", "."]
            }
    }
    output {
        stdout {
            codec => "rubydebug"
        }

    輸入內容:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

    結果如下:

    可以看到上述規則已成功輸出。

    3、Logstash輸出插件(output)

    output是Logstash的最后階段,一個事件可以經過多個輸出,而一旦所有輸出處理完成,整個事件就執行完成。也就是說可以輸出到多個數據終點。

    一些常用的輸出包括:

    • file: 表示將日志數據寫入磁盤上的文件。

    • elasticsearch:表示將日志數據發送給Elasticsearch。Elasticsearch可以高效方便和易于查詢的保存數據。

    詳細請見網址:https://www.elastic.co/guide/en/logstash/current/output-plugins.html

    下面用法就不演示了,和上面大同小異。

    3.1 輸出到標準輸出(stdout)

    output {
        stdout {
            codec => rubydebug
        }
    }

    3.2 保存為文件(file)

    output {
        file {
            path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
        }
    }

    3、輸出到elasticsearch

    output {
        elasticsearch {
            host => ["192.168.1.1:9200","172.16.213.77:9200"]
            index => "logstash-%{+YYYY.MM.dd}"       
        }
    }
    • host:是一個數組類型的值,后面跟的值是elasticsearch節點的地址與端口,默認端口是9200。可添加多個地址。

    • index:寫入elasticsearch的索引的名稱,這里可以使用變量。Logstash提供了%{+YYYY.MM.dd}這種寫法。在語法解析的時候,看到以+ 號開頭的,就會自動認為后面是時間格式,嘗試用時間格式來解析后續字符串。這種以天為單位分割的寫法,可以很容易的刪除老的數據或者搜索指定時間范圍內的數據。此外,注意索引名中不能有大寫字母。

    • manage_template:用來設置是否開啟logstash自動管理模板功能,如果設置為false將關閉自動管理模板功能。如果我們自定義了模板,那么應該設置為false。

    • template_name:這個配置項用來設置在Elasticsearch中模板的名稱。

    4、綜合案例

    4.1 數據準備

    下面這個案例將綜合上面所有的內容

    實現實時讀取文件內容到本地的ElasticSearch

    首先初始文件nginx1.log空文件內容如下:

    將光標指向下一行。在config/test1.conf里面添加如下內容,用法同上

    input {
        file {
            path => ["E:/ElasticSearch/logstash-7.3.0/nginx*.log"]        
            start_position => "beginning"
        }
    }
    
    filter {
      grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
    	date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    	mutate {
               rename => { "response" => "response_new" }
               convert => [ "response","float" ]
               gsub => ["referrer","\"",""]
               remove_field => ["timestamp"]
               split => ["clientip", "."]
            }
    }
    
    output {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-%{+YYYY.MM.dd}"
        }
    }

    4.2 運行結果

    查看es中索引結果

    可以看到索引已成功建立 

    在往文件里添加內容

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

    結果如下:

    可以看到更新的數據已輸出至ES,并且規則已成功體現在數據上面。

    posted @ 2022-06-25 22:41  |舊市拾荒|  閱讀(156)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看