Categories
程式開發

容器日誌採集利器:Filebeat深度剖析與實踐



{“type”:”doc”,”content”:[{“type”:”heading”,”attrs”:{“align”:null,”level”:1}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”在云原生时代和容器化浪潮中,容器的日志采集是一个看起来不起眼却又无法忽视的重要议题。对于容器日志采集我们常用的工具有Filebeat和Fluentd,两者对比各有优劣,相比基于ruby的Fluentd,考虑到可定制性,我们一般默认选择golang技术栈的Filebeat作为主力的日志采集agent。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”相比较传统的日志采集方式,容器化下单节点会运行更多的服务,负载也会有更短的生命周期,而这些更容易对日志采集agent造成压力,虽然Filebeat足够轻量级和高性能,但如果不了解Filebeat的机制,不合理的配置Filebeat,实际的生产环境使用中可能也会给我们带来意想不到的麻烦和难题。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”heading” ,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:”整体架构”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”日志采集的功能看起来不复杂,主要功能无非就是找到配置的日志文件,然后读取并处理,发送至相应的后端如elasticsearch,kafka等。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”Filebeat官网有张示意图,如下所示:”}]},{“type”:”image”,”attrs”:{“src”:”https://static001.geekbang.org/infoq/58/58f9ddad84424ee284f7ef54870615c4.png”,”alt”:null,”title”: “”,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:””,”fromPaste”:false,”pastePass”:false}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align “:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null} ,”content”:[{“type”:”text”,”text”:”针对每个日志文件,Filebeat都会启动一个harvester协程,即一个goroutine,在该goroutine中不停的读取日志文件,直到文件的EOF末尾。一个最简单的表示采集目录的input配置大概如下所示: “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock” ,”attrs”:{“lang”:”yaml”},”content”:[{“type”:”text”,”text”:”filebeat.inputs:n- type: logn # Paths that should be crawled and fetched. Glob based paths.n paths:n – /var/log/*.log”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph” ,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”不同的harvester goroutine采集到的日志数据都会发送至一个全局的队列queue中,queue的实现有两种:基于内存和基于磁盘的队列,目前基于磁盘的队列还是处于alpha阶段,Filebeat默认启用的是基于内存的缓存队列。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”每当队列中的数据缓存到一定的大小或者超过了定时的时间(默认1s),会被注册的client从队列中消费,发送至配置的后端。目前可以设置的client有kafka、elasticsearch、redis等。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph” ,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”虽然这一切看着挺简单,但在实际使用中,我们还是需要考虑更多的问题,例如:”}]},{“type”:”bulletedlist”,”content”:[{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”日志文件是如何被filbebeat发现又是如何被采集的?”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”Filebeat是如何确保日志采集发送到远程的存储中,不丢失一条数据的?”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”如果Filebeat挂掉,下次采集如何确保从上次的状态开始而不会重新采集所有日志?”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”Filebeat的内存或者cpu占用过多,该如何分析解决?”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”Filebeat如何支持docker和kubernetes,如何配置容器化下的日志采集?”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”想让Filebeat采集的日志发送至的后端存储,如果原生不支持,怎样定制化开发?”}]}]}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph” ,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”这些均需要对Filebeat有更深入的理解,下面让我们跟随Filebeat的源码一起探究其中的实现机制。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”heading” ,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:”一条日志是如何被采集的”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”Filebeat源码归属于beats项目,而beats项目的设计初衷是为了采集各类的数据,所以beats抽象出了一个libbeat库,基于libbeat我们可以快速的开发实现一个采集的工具,除了Filebeat,还有像metricbeat、packetbeat等官方的项目也是在beats工程中。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”如果我们大致看一下代码就会发现,libbeat已经实现了内存缓存队列memqueue、几种output日志发送客户端,数据的过滤处理processor等通用功能,而Filebeat只需要实现日志文件的读取等和日志相关的逻辑即可。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph” ,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”从代码的实现角度来看,Filebeat大概可以分以下几个模块:”}]},{“type”:”bulletedlist”,”content”:[{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”input: 找到配置的日志文件,启动harvester”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”harvester: 读取文件,发送至spooler”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”spooler: 缓存日志数据,直到可以发送至publisher”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”publisher: 发送日志至后端,同时通知registrar”}]}]},{“type”:”listitem”,”content”:[{“type”:”paragraph”,”content”:[{“type”:”text”,”text”:”registrar: 记录日志文件被采集的状态”}]}]}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”heading” ,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”1. 找到日志文件”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”对于日志文件的采集和生命周期管理,Filebeat抽象出一个Crawler的结构体,”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”在Filebeat启动后,crawler会根据配置创建,然后遍历并运行每个input: “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock” ,”attrs”:{“lang”:”go”},”content”:[{“type”:”text”,”text”:”tfor _, inputConfig := range c.inputConfigs {ntterr := c.startInput(pipeline, inputConfig, r.GetStates())nt}”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”在每个input运行的逻辑里,首先会根据配置获取匹配的日志文件,需要注意的是,这里的匹配方式并非正则,而是采用linux glob的规则,和正则还是有一些区别。 “}]},{“type”:”codeblock”,”attrs”:{“lang”:”go”},”content”:[{“type”:”text”,”text”:”ttmatches, err := filepath.Glob(path)”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”获取到了所有匹配的日志文件之后,会经过一些复杂的过滤,例如如果配置了”},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”exclude”},{“type”:”text”,”marks”:[{“type”:”italic”}],”text”:”files”}]},{“type”:”text”,”marks”:[{“type”:”italic”}],”text”:”則會忽略這類文件,同時還會查詢文件的狀態,如果文件的最近一次修改時間大於”},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”ignore”}],”marks”:[{“type”:”italic”}]},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”older”}]},{“type”:”text”,”text”:”的配置,也會不去採集該文件。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”heading” ,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”2. 读取日志文件”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”匹配到最终需要采集的日志文件之后,Filebeat会对每个文件启动harvester goroutine,在该goroutine中不停的读取日志,并发送给内存缓存队列memqueue。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”在”},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”(h *Harvester) Run()”}]},{“type”:”text”,”text”:”方法中,我們可以看到這麼一個無限循環,省略了一些邏輯的代碼如下所示:”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock” ,”attrs”:{“lang”:”go”},”content”:[{“type”:”text”,”text”:”tfor {nttmessage, err := h.reader.Next()nttif err != nil {ntttswitch err {ntttcase ErrFileTruncate:nttttlogp.Info(“File was truncated. Begin reading file from offset 0: %s”, h.state.Source)ntttth.state.Offset = 0nttttfilesTruncated.Add(1)ntttcase ErrRemoved:nttttlogp.Info(“File was removed: %s. Closing because closeremoved is enabled.”, h.state.Source)ntttcase ErrRenamed:nttttlogp.Info(“File was renamed: %s. Closing because closerenamed is enabled.”, h.state.Source)ntttcase ErrClosed:nttttlogp.Info(“Reader was closed: %s. Closing.”, h.state.Source)ntttcase io.EOF:nttttlogp.Info(“End of file reached: %s. Closing because closeeof is enabled.”, h.state.Source)ntttcase ErrInactive:nttttlogp.Info(“File is inactive: %s. Closing because closeinactive of %v reached.”, h.state.Source, h.config.CloseInactive)ntttdefault:nttttlogp.Err(“Read line error: %v; File: %v”, err, h.state.Source)nttt}ntttreturn nilntt}ntt…nttif !h.sendEvent(data, forwarder) {ntttreturn nilntt}n}”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”可以看到,reader.Next()方法会不停的读取日志,如果没有返回异常,则发送日志数据到缓存队列中。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”返回的异常有几种类型,除了读取到EOF外,还会有例如文件一段时间不活跃等情况发生会使harvester goroutine退出,不再采集该文件,并关闭文件句柄。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”Filebeat为了防止占据过多的采集日志文件的文件句柄,默认的”},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”close_inactive”}]},{“type”:”text”,”text”:”參數為5min,如果日誌文件5min內沒有被修改,上面代碼會進入ErrInactive的case,之後該harvester goroutine會被關閉。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”这种场景下还需要注意的是,如果某个文件日志采集中被移除了,但是由于此时被Filebeat保持着文件句柄,文件占据的磁盘空间会被保留直到harvester goroutine结束。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”heading” ,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”3. 缓存队列 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”在memqueue被初始化时,Filebeat会根据配置”},{“type”:”codeinline”,”content”:[{“type”:”text”,”text”:”min_event”}]},{“type”:”text”,”text”:”是否大於1創建BufferingEventLoop或者DirectEventLoop,一般默認都是BufferingEventLoop,即帶緩衝的隊列。 “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock” ,”attrs”:{“lang”:”go”},”content”:[{“type”:”text”,”text”:”type bufferingEventLoop struct {ntbroker Brokernntbuf batchBufferntflushList flushListnteventCount intnntminEvents intntmaxEvents intntflushTimeout time.Durationnnt// active broker API channelsntevents chan pushRequestntget chan getRequestntpubCancel chan producerCancelRequestnnt// ack handlingntacks chan int // ackloop -> eventloop : total number of events ACKed by outputsntschedACKS chan chanList // eventloop -> ackloop : active list of batches to be ackedntpendingACKs chanList // ordered list of active batches to be send to the ackloopntackSeq uint // ack batch sequence number to validate orderingnnt// buffer flush timer statenttimer *time.TimerntidleC <-chan time.Timen}n"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"BufferingEventLoop是一个实现了Broker、带有各种channel的结构,主要用于将日志发送至consumer消费。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"BufferingEventLoop的run方法中,同样是一个无限循环,这里可以认为是一个日志事件的调度中心。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock" ,"attrs":{"lang":"go"},"content":[{"type":"text","text":"tfor {nttselect {nttcase <-broker.done:ntttreturnnttcase req := <-l.events: // producer pushing new eventntttl.handleInsert(&req)nttcase req := <-l.get: // consumer asking for next batchntttl.handleConsumer(&req)nttcase count := <-l.acks:ntttl.handleACK(count)nttcase 0 {nttttl.flushBuffer()nttt}ntt}nt}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上文中harvester goroutine每次读取到日志数据之后,最终会被发送至bufferingEventLoop中的"},{"type":"codeinline","content":[{"type":"text","text":"events chan pushRequest"}]},{"type":"text","text":" channel,然後觸發上面"},{"type":"codeinline","content":[{"type":"text","text":"req := <-l.events"}]},{"type":"text","text":"的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日誌數據的隊列,如果buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了"},{"type":"codeinline","content":[{"type":"text","text":"case <-l.idleC"}]},{"type":"text","text":",均會調用flushBuffer()方法。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"flushBuffer()又会触发"},{"type":"codeinline","content":[{"type":"text","text":"req := <-l.get"}]},{"type":"text","text":"的case,然後運行handleConsumer方法,該方法中最重要的是這一句代碼:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock" ,"attrs":{"lang":"go"},"content":[{"type":"text","text":"treq.resp <- getResponse{ackChan, events}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这里获取到了consumer消费者的response channel,然后发送数据给这个channel。真正到这,才会触发consumer对memqueue的消费。所以,其实memqueue并非一直不停的在被consumer消费,而是在memqueue通知consumer的时候才被消费,我们可以理解为一种脉冲式的发送。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading" ,"attrs":{"align":null,"level":3},"content":[{"type":"text","text":"4. 消费队列"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"实际上,早在Filebeat初始化的时候,就已经创建了一个eventConsumer并在loop无限循环方法里试图从Broker中获取日志数据。"}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"tfor {nttif !paused && c.out != nil && consumer != nil && batch == nil {ntttout = c.out.workQueuentttqueueBatch, err := consumer.Get(c.out.batchSize)nttt...ntttbatch = newBatch(c.ctx, queueBatch, c.out.timeToLive)ntt}ntt...nttselect {nttcase <-c.done:ntttreturnnttcase sig := <-c.sig:nttthandleSignal(sig)nttcase out <- batch:ntttbatch = nilntt}nt}n"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上面consumer.Get就是消费者consumer从Broker中获取日志数据,然后发送至out的channel中被output client发送,我们看一下Get方法里的核心代码: "}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"tselect {ntcase c.broker.requests <- getRequest{sz: sz, resp: c.resp}:ntcase <-c.done:nttreturn nil, io.EOFnt}nnt// if request has been send, we do have to wait for a responsentresp := <-c.respntreturn &batch{nttconsumer: c,nttevents: resp.buf,nttack: resp.ack,nttstate: batchActive,nt}, niln"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"getRequest的结构如下:"}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"type getRequest struct {ntsz int // request sz events from the brokerntresp chan getResponse // channel to send response ton}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"getResponse的结构: "}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"type getResponse struct {ntack *ackChanntbuf []publisher.Eventn}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"getResponse里包含了日志的数据,而getRequest包含了一个发送至消费者的channel。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在上文bufferingEventLoop缓冲队列的handleConsumer方法里接收到的参数为getRequest,里面包含了consumer请求的getResponse channel。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"如果handleConsumer不发送数据,consumer.Get方法会一直阻塞在select中,直到flushBuffer,consumer的getResponse channel才会接收到日志数据。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align": null,"level":3},"content":[{"type":"text","text":"5. 发送日志"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在创建beats时,会创建一个clientWorker,clientWorker的run方法中,会不停的从consumer发送的channel里读取日志数据,然后调用client.Publish批量发送日志。 "}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"func (w *clientWorker) run() {ntfor !w.closed.Load() {nttfor batch := range w.qu {ntttif err := w.client.Publish(batch); err != nil {nttttreturnnttt}ntt}nt}n}n"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"libbeats库中包含了kafka、elasticsearch、logstash等几种client,它们均实现了client接口: "}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"type Client interface {ntClose() errorntPublish(publisher.Batch) errorntString() stringn}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"当然最重要的是实现Publish接口,然后将日志发送出去。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"实际上,Filebeat中日志数据在各种channel里流转的设计还是比较复杂和繁琐的,笔者也是研究了好久、画了很长的架构图才理清楚其中的逻辑。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这里抽出了一个简化后的图以供参考: "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image" ,"attrs":{"src":"https://static001.geekbang.org/infoq/1e/1edf1d23862b0cb9bea51bd81be2ae70.png","alt":null,"title":null,"style":null,"href ":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null, "origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{" type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"如何保证at least once"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Filebeat维护了一个registry文件在本地的磁盘,该registry文件维护了所有已经采集的日志文件的状态。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"实际上,每当日志数据发送至后端成功后,会返回ack事件。Filebeat启动了一个独立的registry协程负责监听该事件,接收到ack事件后会将日志文件的State状态更新至registry文件中,State中的Offset表示读取到的文件偏移量,所以Filebeat会保证Offset记录之前的日志数据肯定被后端的日志存储接收到。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"State结构如下所示: "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock" ,"attrs":{"lang":"go"},"content":[{"type":"text","text":"type State struct {ntId string json:"-" // local unique id to make comparison more efficientntFinished bool json:"-" // harvester statentFileinfo os.FileInfo json:"-" // the file infontSource string json:"source"ntOffset int64 json:"offset"ntTimestamp time.Time json:"timestamp"ntTTL time.Duration json:"ttl"ntType string json:"type"ntMeta map[string]string json:"meta"ntFileStateOS file.StateOSn}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"记录在registry文件中的数据大致如下所示: "}]},{"type":"codeblock","attrs":{"lang":"json"},"content":[{"type":"text","text":"[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"由于文件可能会被改名或移动,Filebeat会根据inode和设备号来标志每个日志文件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"如果Filebeat异常重启,每次采集harvester启动的时候都会读取registry文件,从上次记录的状态继续采集,确保不会从头开始重复发送所有的日志文件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"当然,如果日志发送过程中,还没来得及返回ack,Filebeat就挂掉,registry文件肯定不会更新至最新的状态,那么下次采集的时候,这部分的日志就会重复发送,所以这意味着Filebeat只能保证at least once,无法保证不重复发送。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"还有一个比较异常的情况是,linux下如果老文件被移除,新文件马上创建,很有可能它们有相同的inode,而由于Filebeat根据inode来标志文件记录采集的偏移,会导致registry里记录的其实是被移除的文件State状态,这样新的文件采集却从老的文件Offset开始,从而会遗漏日志数据。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text""text":"為了盡量避免inode被復用的情況,同時防止registry文件隨著時間增長越來越大,建議使用clean"}{"type":"text""marks":[{"type":"text""text":"为了尽量避免inode被复用的情况,同时防止registry文件随着时间增长越来越大,建议使用clean"}{"type":"text""marks":[{"type":"text""text":"為了盡量避免inode被復用的情況,同時防止registry文件隨著時間增長越來越大,建議使用clean"}{"type":"text""marks":[{"type":"text""text":"为了尽量避免inode被复用的情况,同时防止registry文件随着时间增长越来越大,建议使用clean"}{"type":"text""marks":[{"type":"italic"}],"text":"inactive和clean"},{"type":"text","text":"remove配置將長時間未更新或者被刪除的文件State從registry中移除。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"同时我们可以发现在harvester读取日志中,会更新registry的状态处理一些异常场景。例如,如果一个日志文件被清空,Filebeat会在下一次Reader.Next方法中返回ErrFileTruncate异常,将inode标志文件的Offset置为0,结束这次harvester,重新启动新的harvester,虽然文件不变,但是registry中的Offset为0,采集会从头开始。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"特别注意的是,如果使用容器部署Filebeat,需要将registry文件挂载到宿主机上,否则容器重启后registry文件丢失,会使Filebeat从头开始重复采集日志文件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading" ,"attrs":{"align":null,"level":2},"content":[{"type":"text","text":"Filebeat自动reload更新"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"目前Filebeat支持reload input配置,module配置,但reload的机制只有定时更新。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在配置中打开reload.enable之后,还可以配置reload.period表示自动reload配置的时间间隔。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Filebeat在启动时,会创建一个专门用于reload的协程。对于每个正在运行的harvester,Filebeat会将其加入一个全局的Runner列表,每次到了定时的间隔后,会触发一次配置文件的diff判断,如果是需要停止的加入stopRunner列表,然后逐个关闭,新的则加入startRunner列表,启动新的Runner。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align": null,"level":2},"content":[{"type":"text","text":"Filebeat对kubernetes的支持"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Filebeat官方文档提供了在kubernetes下基于daemonset的部署方式,最主要的一个配置如下所示: "}]},{"type":"codeblock","attrs":{"lang":"yaml"},"content":[{"type":"text","text":" - type: dockern containers.ids:n - "*"n processors:n - addkubernetesmetadata:n in_cluster: true"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"即设置输入input为docker类型。由于所有的容器的标准输出日志默认都在节点的"},{"type":"codeinline","content":[{"type":"text","text":"/var/lib/docker/containers//*-json.log"}]},{"type":"text","text":"路徑,所以本質上採集的是這類日誌文件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"和传统的部署方式有所区别的是,如果服务部署在kubernetes上,我们查看和检索日志的维度不能仅仅局限于节点和服务,还需要有podName,containerName等,所以每条日志我们都需要打标增加kubernetes的元信息才发送至后端。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text""text":"Filebeat會在配置中增加了add"}{"type":"text""marks":[{"type":"text""text":"Filebeat会在配置中增加了add"}{"type":"text""marks":[{"type":"text""text":"Filebeat會在配置中增加了add"}{"type":"text""marks":[{"type":"text""text":"Filebeat会在配置中增加了add"}{"type":"text""marks":[{"type":"italic"}],"text":"kubernetes"},{"type":"text","text":"metadata的processor的情況下,啟動監聽kubernetes的watch服務,監聽所有kubernetes pod的變更,然後將歸屬本節點的pod最新的事件同步至本地的緩存中。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"节点上一旦发生容器的销毁创建,/var/lib/docker/containers/下会有目录的变动,Filebeat根据路径提取出containerId,再根据containerId从本地的缓存中找到pod信息,从而可以获取到podName、label等数据,并加到日志的元信息fields中。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Filebeat还有一个beta版的功能autodiscover,autodiscover的目的是把分散到不同节点上的Filebeat配置文件集中管理。目前也支持kubernetes作为provider,本质上还是监听kubernetes事件然后采集docker的标准输出文件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"大致架构如下所示: "}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/0f/0f4f3db78223373cf29da36d38fb6d71.png","alt":null,"title": null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number ":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null, "origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{" type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"但是在实际生产环境使用中,仅采集容器的标准输出日志还是远远不够,我们往往还需要采集容器挂载出来的自定义日志目录,还需要控制每个服务的日志采集方式以及更多的定制化功能。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在轻舟容器云上,我们自研了一个监听kubernetes事件自动生成Filebeat配置的agent,通过CRD的方式,支持自定义容器内部日志目录、支持自定义fields、支持多行读取等功能。同时可在kubernetes上统一管理各种日志配置,而且无需用户感知pod的创建销毁和迁移,自动完成各种场景下的日志配置生成和更新。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading" ,"attrs":{"align":null,"level":2},"content":[{"type":"text","text":"性能分析与调优"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"虽然beats系列主打轻量级,虽然用golang写的Filebeat的内存占用确实比较基于jvm的logstash等好太多,但是事实告诉我们其实没那么简单。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"正常启动Filebeat,一般确实只会占用3、40MB内存,但是在轻舟容器云上偶发性的我们也会发现某些节点上的Filebeat容器内存占用超过配置的pod limit限制(一般设置为200MB),并且不停的触发的OOM。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"究其原因,一般容器化环境中,特别是裸机上运行的容器个数可能会比较多,导致创建大量的harvester去采集日志。如果没有很好的配置Filebeat,会有较大概率导致内存急剧上升。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"当然,Filebeat内存占据较大的部分还是memqueue,所有采集到的日志都会先发送至memqueue聚集,再通过output发送出去。每条日志的数据在Filebeat中都被组装为event结构,Filebeat默认配置的memqueue缓存的event个数为4096,可通过"},{"type":"codeinline","content":[{"type":"text","text":"queue.mem.events"}]},{"type":"text","text":"設置。默認最大的一條日誌的event大小限制為10MB,可通過"},{"type":"codeinline","content":[{"type":"text","text":"max_bytes"}]},{"type":"text","text":"設置。"},{"type":"codeinline","content":[{"type":"text","text":"4096 * 10MB = 40GB"}]},{"type":"text","text":",可以想像,極端場景下,Filebeat至少佔據40GB的內存。特別是配置了multiline多行模式的情況下,如果multiline配置有誤,單個event誤採集為上千條日誌的數據,很可能導致memqueue佔據了大量內存,致使內存爆炸。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"所以,合理的配置日志文件的匹配规则,限制单行日志大小,根据实际情况配置memqueue缓存的个数,才能在实际使用中规避Filebeat的内存占用过大的问题。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph" ,"attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align": null,"level":2},"content":[{"type":"text","text":"如何对Filebeat进行扩展开发"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"一般情况下Filebeat可满足大部分的日志采集需求,但是仍然避免不了一些特殊的场景需要我们对Filebeat进行定制化开发,当然Filebeat本身的设计也提供了良好的扩展性。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"beats目前只提供了像elasticsearch、kafka、logstash等几类output客户端,如果我们想要Filebeat直接发送至其他后端,需要定制化开发自己的output。同样,如果需要对日志做过滤处理或者增加元信息,也可以自制processor插件。 "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"无论是增加output还是写个processor,Filebeat提供的大体思路基本相同。一般来讲有3种方式: "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"numberedlist" ,"attrs":{"start":1,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"直接fork Filebeat,在现有的源码上开发。output或者processor都提供了类似Run、Stop等的接口,只需要实现该类接口,然后在init方法中注册相应的插件初始化方法即可。当然,由于golang中init方法是在import包时才被调用,所以需要在初始化Filebeat的代码中手动import。 "}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"复制一份Filebeat的main.go,import我们自研的插件库,然后重新编译。本质上和方式1区别不大。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"Filebeat还提供了基于golang plugin的插件机制,需要把自研的插件编译成.so共享链接库,然后在Filebeat启动参数中通过-plugin指定库所在路径。不过实际上一方面golang plugin还不够成熟稳定,一方面自研的插件依然需要依赖相同版本的libbeat库,而且还需要相同的golang版本编译,坑可能更多,不太推荐。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}