小奋斗 - 轻松学习从此开始!
IT小奋斗群 QQ群:62017228

Apache flume-ng做agent拦截器可以输出IP

最近在做flume日志收集工作,需要将每台日机器上的日志收集,然后发送到kafka,接着有专门的日志处理程序从kafka中进行消费,其中有个问题就是在消费的时候需要知道当前的日志来自哪台机器,查阅了flume文档发现没有提供该功能,因此就自己定一个flume拦截器。基本的流程就是将日志内容从flume event中取出,然后拼接上自定义的IP,然后再将body设置到event中。

架构部署图如下

image

--------------------------------------------------------------------------------------------------------------------------------------

flume拦截器代码如下

AppendIPInterceptor

package com.eju.ess;

import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.google.common.base.Charsets;

public class AppendIPInterceptor implements Interceptor {

    private String serviceId=null;

    public AppendIPInterceptor(String _serviceId){
        serviceId=_serviceId;
    }

    public Event intercept(Event arg0) {
        String eventBody = new String(arg0.getBody(),Charsets.UTF_8);
        String fmt="%s - %s";
        arg0.setBody(String.format(fmt, serviceId,eventBody).getBytes());
        return arg0;
    }

    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    public void close() {
        //~ null
    }

    public void initialize() {
        //~ null
    }

    /*public static class Builder implements Interceptor.Builder { public void configure(Context context) { } public Interceptor build() { return new AppendIPInterceptor(); } } */
}

--------------------------------------------------------------------------------------------------------------------------------------

在上面的代码中,close和initialize很容易看懂分别是拦截器关闭需要做的事情和拦截器初始化需要做的事情,在日志中增加IP这个业务中,不需要在这个两个方法中写任何内容。

主要的业务代码还是写在intercept中,在flume系统中每一条日志都是一个event,enven中包含heard、和body,增加IP这个业务主要是将event中的body取出,然后拼接上IP,接着将拼接后的结果在设置到event中的body中。

AppendIPBuilder

package com.eju.ess;

import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;

public class AppendIPBuilder implements Interceptor.Builder{

    private String serviceId=null;
    public void configure(Context context) {
        String configServiceId=context.getString("serviceId");
        serviceId=configServiceId;
    }

    public Interceptor build() {
        return new AppendIPInterceptor(serviceId);
    }

}

--------------------------------------------------------------------------------------------------------------------------------------

上面代码是一个builder,是用来启动拦截器,在flume.conf中需要指定该类的绝对路径。

flume.conf

--------------------------------------------------------------------------------------------------------------------------------------

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.eju.ess.AppendIPBuilder
a1.sources.r1.interceptors.i1.serviceId = 10.99.70.51

--------------------------------------------------------------------------------------------------------------------------------------

上面代码就是在flume.conf的配置,a1.sources.r1.interceptors.i1.type指定AppendIPBuilder的绝对路径,a1.sources.r1.interceptors.i1.serviceId配置一下机器的IP或者机器名字。

有同学可能会考虑性能问题,性能问题我确实还没有测试过,哈哈。

小奋斗文章

--------------------------------------------------------------------------------------------------------------------------------------

我来评几句
登录后评论

已发表评论数(0)