docker部署ELK(logstash、elasticsearch、kibana),监控日志

文章目录

[隐藏]

  • 一、每个单独部署
    • 1、elasticsearch部署
    • 2、logstash部署
    • 3、kibana部署
  • 二、docker-compose一起部署
    • 1、第一步在docker上安装ELK
    • 2、springboot日志系统配置logstash
    • 3、配置kibana,现在只要服务器通过指定的Tag打印日志,日志信息将会上传logstash解析,并且存储到elasticsearch,然后只需要kibana配置对应的elasticsearch的index即可看到所需的日志信息。
    • 4、接下讲一下我们工程中关于http接口日志的配置

由于是首次部署,第一次想着是单独部署logstash、elasticsearch、kibana,然后通过配置实现日志的监控,以下为部署步骤,但是最终失败,只能采取docker-compose来部署,以下内容可以略过,仅作为参考。

一、每个单独部署

先部署elasticsearch,因为logstash要设置日志输出位置,而输出位置正是elasticsearch,所以需要先部署启动elasticsearch,logstash才能部署并启动成功。

1、elasticsearch部署

1.制作elasticsearch镜像

docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4  

2.创建并启动elasticsearch容器

docker run -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” -d docker.elastic.co/elasticsearch/elasticsearch:6.5.4  

通过本地浏览器访问localhost:9200,返回如下内容,则证明elasticsearch部署成功

官方参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

2、logstash部署

1.制作logstash镜像

docker pull docker.elastic.co/logstash/logstash:6.5.4  

2.创建并启动logstash容器

docker run --rm -it -p 4560:4560 -v /home/xijie/app/mylogstash/pipeline/:/usr/share/logstash/pipeline/ -d docker.elastic.co/logstash/logstash:6.5.4  

注意文件pipeline的映射,在pipeline中需要创建logstash的配置文件logstash-springboot.conf,该文件内容如下:

input {      tcp {          port => 4560          codec => json_lines      }  }  output{    elasticsearch { hosts => ["localhost:9200"] }    stdout { codec => rubydebug }  }  

注意output中设置了elasticsearch的连接地址:localhost:9200

官方参考文章:https://www.elastic.co/guide/en/logstash/current/docker-config.html

3、kibana部署

1.制作镜像

docker pull docker.elastic.co/kibana/kibana:6.5.4  

2.创建并启动容器

docker run -p 5601:5601 --mount type=bind,src=/home/xijie/app/mykibana/kibana.yml,dst=/usr/share/kibana/config/kibana.yml -d mysql/mysql-server:5.7  
二、docker-compose一起部署

参考文章:https://www.jianshu.com/p/c2f6e80b2756

1、第一步在docker上安装ELK

1.创建目录

mkdir /home/xijie/app/myelk  

2.从github上拉取部署elk所需资料

$ git clone https://github.com/deviantony/docker-elk.git  

下载完毕的资料目录如下:

3.进入刚下载的文件夹内

$ cd docker-elk  

4.通过docker-compose创建并启动容器

$ docker-compose up -d  

5.这个时候通过docker ps可以看到logstash、elasticsearch、kibana容器已经创建并且启动。

可以看该elk容器的默认端口为:

  • 5000: Logstash TCP input.
  • 9200: Elasticsearch HTTP
  • 9300: Elasticsearch TCP transport
  • 5601: Kibana

Kibana的web入口:http://localhost:5601

6.接下来进行参数配置,来实现springboot通过ELK查看日志信息。

修改logstash的配置,

进入logstash配置文件所在目录:

cd /home/xijie/app/myelk/dokcer-elk/logstash/pipeline  

打开配置文件:

vim logstash.conf  

修改内容如下:

input{          tcp {                  mode => "server"                  port => 5000                  codec => json_lines                  tags => ["data-http"]          }  }  filter{      json{          source => "message"          remove_field => ["message"]      }  }  output{      if "data-http" in [tags]{          elasticsearch{                  hosts=> ["elasticsearch:9200"]                  index => "data-http-%{+YYYY.MM.dd}"                  }          stdout{codec => rubydebug}      }  }  

注:

input标签为logstash进数据接口,filter标签为数据过滤器,output为数据出去接口。

input标签使用的是tcp,说明springboot客户端需要将日志传递到该接口,该接口正是logstash服务器接口。

filter将message字段去掉,只是为了当展示springboot的http请求接口的数据更加规整,而不是全部展示在message字段中。

output标签将数据传递给了elasticsearch,这里使用了if,当判断所出数据为所指定tag,才进行下面的配置。特别要注意index的配置,该值在kibana中需要使用,这里指定的index值为:data-http,要注意该值与tag是没有关系的,要注意区分。

参考文章:https://www.elastic.co/guide/en/logstash/current/index.html

这个时候重启elk即可。

进入docker-elk目录

/home/xijie/app/myelk/docker-elk  

然后重启

docker-compose restart  

关于elasticsearch、logstash、kibana的配置都在对应目录下的config文件夹中的.yml文件中,只需要修改该文件即可。

2、springboot日志系统配置logstash

1.pom中配置logstash

<dependency>          <groupId>net.logstash.logback</groupId>           <artifactId>logstash-logback-encoder</artifactId>           <version>5.2</version>  </dependency>  

2.application.properties文件中配置如下:

#logstash日志收集地址,即logstash服务器地址  logstash.ip_port=172.168.0.165:5000  #日志保存级别  logging.all.level=info  #日志保存地址,该值与logstash没关系,是当日志存在本地File文件内的文件夹地址  logging.levelfile=/home/logs/data-center-service  

3.logback.xml文件内容如下,该文件在resources目录

<?xml version="1.0" encoding="UTF-8"?>  <configuration debug="false">      roperty resource="application.properties">roperty>      <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->      roperty name="LOG_HOME" value="${logging.levelfile}" />      roperty name="LOG_LEVEL" value="${logging.all.level}" />      <!-- 控制台输出 -->      <appender name="STDOUT" >          <encoder >              <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->              attern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}:%L - %msg  %nattern>              <charset>UTF-8</charset>          </encoder>      </appender>      <!-- 按照每天生成日志文件 -->      <appender name="FILE"  >          <rollingPolicy >              <!--日志文件输出的文件名-->              <FileNamePattern>${LOG_HOME}/data-center-service.log.%d{yyyy-MM-dd}.log</FileNamePattern>              <!--日志文件保留天数-->              <MaxHistory>30</MaxHistory>          </rollingPolicy>          <encoder >              <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->              attern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%nattern>              <charset>UTF-8</charset>          </encoder>          <!--日志文件最大的大小-->          <triggeringPolicy >              <MaxFileSize>100MB</MaxFileSize>          </triggeringPolicy>      </appender>      <appender name="logstash" >          <destination>${logstash.ip_port}</destination>          <encoder charset="UTF-8"  />          <queueSize>1048576</queueSize>          <keepAliveDuration>5 minutes</keepAliveDuration>          <!--<customFields>{"application-name":"data-repo-interface"}</customFields>-->          <!--<filter >              <level>INFO</level>          </filter>          <filter >              <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –>                  <expression>return message.contains("billing");</expression>              </evaluator>              <OnMatch>ACCEPT</OnMatch>              <OnMismatch>DENY</OnMismatch>          </filter>-->      </appender>        <logger name="elk_logger" level="INFO" additivity="false">          <appender-ref ref="logstash"/>      </logger>        <!--<logger name="com.zaxxer" level="${LOG_LEVEL}"/>-->      <!--<logger name="org.apache.ibatis" level="${LOG_LEVEL}"/>-->      <!--<logger name="org.mybatis.spring" level="${LOG_LEVEL}"/>-->      <!--<logger name="org.springframework" level="${LOG_LEVEL}"/>-->      <!--<logger name="java.sql.Connection" level="${LOG_LEVEL}"/>-->      <!--<logger name="java.sql.Statement" level="${LOG_LEVEL}"/>-->      <!--<logger name="java.sql.PreparedStatement" level="${LOG_LEVEL}"/>-->        <!-- 日志输出级别 -->      <root level="${LOG_LEVEL}">          <appender-ref ref="STDOUT" />          <appender-ref ref="FILE" />          <!--<appender-ref ref="logstash" />-->      </root>  </configuration>  

在上述配置文件中

<appender name="logstash" >          <destination>${logstash.ip_port}</destination>          <encoder charset="UTF-8"  />          <queueSize>1048576</queueSize>          <keepAliveDuration>5 minutes</keepAliveDuration>          <!--<customFields>{"application-name":"data-repo-interface"}</customFields>-->          <!--<filter >              <level>INFO</level>          </filter>          <filter >              <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –>                  <expression>return message.contains("billing");</expression>              </evaluator>              <OnMatch>ACCEPT</OnMatch>              <OnMismatch>DENY</OnMismatch>          </filter>-->      </appender>        <logger name="elk_logger" level="INFO" additivity="false">          <appender-ref ref="logstash"/>      </logger>  

是logstash的主要配置。注意该配置中name=”elk_logger”,这样的指定,即

Logger elkLogger = LoggerFactory.getLogger("elk_logger");  

当使用slf4j生成Logger时,只要指定Tag为“elk_logger”的日志输出或打印,都会上传到logstash服务器,并存储到elasticsearch,用kibana查看。

3、配置kibana,现在只要服务器通过指定的Tag打印日志,日志信息将会上传logstash解析,并且存储到elasticsearch,然后只需要kibana配置对应的elasticsearch的index即可看到所需的日志信息。

通过浏览器访问kibana,http://localhost:5601

由于本人是通过虚拟机部署的服务,而且虚拟机的ip为172.168.0.165,所以在宿主机中通过访问http://172.168.0.165:5601即可。

然后点击左侧的management模块。

接下来点击Index Patterns,创建index pattern

接下来填入信息,在index pattern框中填入上述创建ElK时logstash配置文件中的index信息“data-http-*”

填写的同时,下面会显示出elasticsearch中已经有该index得数据,(注意:在配置kibana时,应该先运行服务器,让服务区打印出对应的index日志,elasticsearch中也保存了该日志,这时才能配置kibana成功)。

然后点击右侧的next step。

在Time Filter field name列表中选择@timestamp,然后按下Create index pattern按钮及创建成功。

这时点击左侧的Discover模块,选中data-http-*标签即可。在右侧就显示出了日志信息。如下图:

这样整个ELK与日志系统就搭建完毕了。

4、接下讲一下我们工程中关于http接口日志的配置

由于我们提供的服务器接口是上游,是给其他部门服务的,这里会牵扯到大量的专业数据,而为了避免数据问题的纠纷与接口错误问题的排查,所以需要将特定的接口请求数据保存,这样可以通过resquest与response来排查到底是哪个部门的问题。

1.使用了AOP对每次请求进行日志拦截。

定义SysLogAspect类,该类内容如下:

package com;    import com.alibaba.fastjson.JSON;  import com.alibaba.fastjson.serializer.SerializerFeature;  import com.SpringContextUtil;  import com.LoggerEntity;  import com.MailService;  import lombok.extern.slf4j.Slf4j;  import org.aspectj.lang.JoinPoint;  import org.aspectj.lang.ProceedingJoinPoint;  import org.aspectj.lang.annotation.*;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.stereotype.Component;  import org.springframework.web.context.request.RequestAttributes;  import org.springframework.web.context.request.RequestContextHolder;  import org.springframework.web.context.request.ServletRequestAttributes;    import javax.servlet.http.HttpServletRequest;  import javax.servlet.http.HttpServletResponse;  import java.io.PrintWriter;  import java.io.StringWriter;  import java.lang.reflect.Field;  import java.util.Date;  import java.util.HashMap;  import java.util.Map;    @Aspect  @Component  @Slf4j  public class SysLogAspect {        Logger elkLogger = LoggerFactory.getLogger("elk_logger");          /**       * 开始时间       */      private long startTime = 0L;      /**       * 结束时间       */      private long endTime = 0L;    //  @Autowired  //  private ILogService logService;        public static Map<String, Object> getKeyAndValue(Object obj) {          Map<String, Object> map = new HashMap<>();          // 得到类对象          Class userCla = (Class) obj.getClass();          /* 得到类中的所有属性集合 */          Field[] fs = userCla.getFields();          for (int i = 0; i < fs.length; i++) {              Field f = fs[i];              f.setAccessible(true); // 设置些属性是可以访问的              Object val = new Object();              try {                  val = f.get(obj);                  // 得到此属性的值                  map.put(f.getName(), val);// 设置键值              } catch (IllegalArgumentException e) {                  e.printStackTrace();              } catch (IllegalAccessException e) {                  e.printStackTrace();              }            }          return map;      }        //通过该注解来判断该接口请求是否进行cut      @Pointcut("@annotation(com.hongdaoai.datastore.common.annotation.SysLog)")      public void cutController() {      }        @Before("cutController()")      public void doBeforeInServiceLayer(JoinPoint joinPoint) {          log.debug("doBeforeInServiceLayer");          startTime = System.currentTimeMillis();      }        @After("cutController()")      public void doAfterInServiceLayer(JoinPoint joinPoint) {          log.debug("doAfterInServiceLayer");      }        @Around("cutController()")      public Object recordSysLog(ProceedingJoinPoint joinPoint) throws Throwable {            RequestAttributes ra = RequestContextHolder.getRequestAttributes();            ServletRequestAttributes sra = (ServletRequestAttributes) ra;            HttpServletRequest request = sra.getRequest();            HttpServletResponse response = sra.getResponse();            //ELK日志实体类          LoggerEntity elkLog = new LoggerEntity();            //应用程序名称          elkLog.setApplicationName(SpringContextUtil.getApplicationName());            //profile.active          elkLog.setProfileActive(SpringContextUtil.getActiveProfile());            // 请求的类名          elkLog.setClassName(joinPoint.getTarget().getClass().getName());            // 请求的方法名          elkLog.setMethodName(joinPoint.getSignature().getName());            //请求完整地址          elkLog.setUrl(request.getRequestURL().toString());            //请求URI          elkLog.setUri(request.getRequestURI());            //请求类型          elkLog.setRequestMethod(request.getMethod());            String queryString = request.getQueryString();            Object[] args = joinPoint.getArgs();            String params = "";            //获取请求参数集合并进行遍历拼接          if (args.length > 0) {                if ("POST".equals(request.getMethod())) {                    //param                  Object object = args[0];                    Map<String, Object> map = new HashMap<>();                    Map paramMap = getKeyAndValue(object);                    map.put("param", paramMap);                    if (args.length > 1) {                        object = args[1];                        Map bodyMap = getKeyAndValue(object);                        map.put("body", bodyMap);                  }                    params = JSON.toJSONStringWithDateFormat(map, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes);                } else if ("GET".equals(request.getMethod())) {                    params = queryString;                }          }            //请求参数内容(param)          elkLog.setRequestParamData(params);            //客户端IP          elkLog.setClientIp(request.getRemoteAddr());            //终端请求方式,普通请求,ajax请求          elkLog.setRequestType(request.getHeader("X-Requested-With"));            //sessionId          elkLog.setSessionId(request.getRequestedSessionId());            //请求时间  //        elkLog.setRequestDateTime(new Date(startTime));          elkLog.setRequestDateTime(new Date());            Object result = null;            try {                // 环绕通知 ProceedingJoinPoint执行proceed方法的作用是让目标方法执行,这也是环绕通知和前置、后置通知方法的一个最大区别。              result = joinPoint.proceed();            } catch (Exception e) {                endTime = System.currentTimeMillis();                //请求耗时(单位:毫秒)              elkLog.setSpentTime(endTime - startTime);                //接口返回时间              elkLog.setResponseDateTime(new Date(endTime));                //请求时httpStatusCode代码              elkLog.setHttpStatusCode(String.valueOf(response.getStatus()));                String elkLogData = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS");                elkLogger.error(elkLogData);    //            log.error(e.getMessage(), e);                throw e;    //            return result;            }            endTime = System.currentTimeMillis();            //请求耗时(单位:毫秒)          elkLog.setSpentTime(endTime - startTime);            if (!elkLog.getMethodName().equals("getBookOriginalContent")) {                //接口返回数据              elkLog.setResponseData(JSON.toJSONStringWithDateFormat(result, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes));            }            //接口返回时间          elkLog.setResponseDateTime(new Date(endTime));            //请求时httpStatusCode代码          elkLog.setHttpStatusCode(String.valueOf(response.getStatus()));    //        if (SpringContextUtil.getActiveProfile().equals("prod")) {          String s = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS");          elkLogger.info(s);            return result;      }      }  

SpringContextUtil类内容如下:

package com;    import org.springframework.beans.BeansException;  import org.springframework.context.ApplicationContext;  import org.springframework.context.ApplicationContextAware;  import org.springframework.stereotype.Component;    import java.util.Locale;    @Component  public class SpringContextUtil implements ApplicationContextAware {        private static ApplicationContext context = null;        // 传入线程中      public static <T> T getBean(String beanName) {          return (T) context.getBean(beanName);      }        // 国际化使用      public static String getMessage(String key) {          return context.getMessage(key, null, Locale.getDefault());      }        /// 获取应用程序名称      public static String getApplicationName() {          return context.getEnvironment().getProperty("spring.application.name");      }        /// 获取当前环境      public static String getActiveProfile() {          return context.getEnvironment().getActiveProfiles()[0];      }        /* (non Javadoc)       * @Title: setApplicationContext       * @Description: spring获取bean工具类       * @param applicationContext       * @throws BeansException       * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)       */      @Override      public void setApplicationContext(ApplicationContext applicationContext)              throws BeansException {          this.context = applicationContext;      }  }  

LoggerEntity类内容如下:

package com;    import lombok.Data;    import java.io.Serializable;  import java.util.Date;    //@Entity  //@Table(name = "t_logger_infos")  @Data  public class LoggerEntity implements Serializable {        //应用程序名      private String applicationName;        //spring.profiles.active      private String profileActive;        //客户端请求ip      private String clientIp;        //客户端请求路径      private String uri;        //客户端请求完整路径      private String url;        //请求方法名      private String methodName;        //请求类名      private String className;        //终端请求方式,普通请求,ajax请求      private String requestType;        //请求方式method,post,get等      private String requestMethod;        //请求参数内容,json      private String requestParamData;        //请求body参数内容,json      private String requestBodyData;        //请求接口唯一session标识      private String sessionId;        //请求时间      private Date requestDateTime;        //接口返回时间      private Date responseDateTime;        //接口返回数据json      private String responseData;        //请求时httpStatusCode代码,如:200,400,404等      private String httpStatusCode;        //请求耗时秒单位      private long spentTime;    }  

SysLog注解类如下:

package com;    import java.lang.annotation.*;    @Target(ElementType.METHOD)  @Retention(RetentionPolicy.RUNTIME)  @Documented  public @interface SysLog {        boolean isLog() default true;    }  

在SysLogAspect切面类中使用@SysLog注解来判断该接口是否进行日志传递。

所以在我们写的接口中只要在方法上加入该注解,即可控制日志是否上传。

如该Controller中所定义接口:

@SysLog(isLog = true)  @RequestMapping(value = "/getDataList", method = RequestMethod.POST)  public ApiResult getInputDataList(@RequestBody VersionVo vo) {      return success();  }  

原文出处:csdn -> https://blog.csdn.net/zhizhuodewo6/article/details/86630712

本站所发布的一切资源仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如果侵犯你的利益,请发送邮箱到 [email protected],我们会很快的为您处理。