docker部署ELK(logstash、elasticsearch、kibana),监控日志
文章目录
[隐藏]
- 一、每个单独部署
- 1、elasticsearch部署
- 2、logstash部署
- 3、kibana部署
- 二、docker-compose一起部署
- 1、第一步在docker上安装ELK
- 2、springboot日志系统配置logstash
- 3、配置kibana,现在只要服务器通过指定的Tag打印日志,日志信息将会上传logstash解析,并且存储到elasticsearch,然后只需要kibana配置对应的elasticsearch的index即可看到所需的日志信息。
- 4、接下讲一下我们工程中关于http接口日志的配置
由于是首次部署,第一次想着是单独部署logstash、elasticsearch、kibana,然后通过配置实现日志的监控,以下为部署步骤,但是最终失败,只能采取docker-compose来部署,以下内容可以略过,仅作为参考。
一、每个单独部署
先部署elasticsearch,因为logstash要设置日志输出位置,而输出位置正是elasticsearch,所以需要先部署启动elasticsearch,logstash才能部署并启动成功。
1、elasticsearch部署
1.制作elasticsearch镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4
2.创建并启动elasticsearch容器
docker run -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” -d docker.elastic.co/elasticsearch/elasticsearch:6.5.4
通过本地浏览器访问localhost:9200,返回如下内容,则证明elasticsearch部署成功
官方参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html
2、logstash部署
1.制作logstash镜像
docker pull docker.elastic.co/logstash/logstash:6.5.4
2.创建并启动logstash容器
docker run --rm -it -p 4560:4560 -v /home/xijie/app/mylogstash/pipeline/:/usr/share/logstash/pipeline/ -d docker.elastic.co/logstash/logstash:6.5.4
注意文件pipeline的映射,在pipeline中需要创建logstash的配置文件logstash-springboot.conf,该文件内容如下:
input { tcp { port => 4560 codec => json_lines } } output{ elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
注意output中设置了elasticsearch的连接地址:localhost:9200
官方参考文章:https://www.elastic.co/guide/en/logstash/current/docker-config.html
3、kibana部署
1.制作镜像
docker pull docker.elastic.co/kibana/kibana:6.5.4
2.创建并启动容器
docker run -p 5601:5601 --mount type=bind,src=/home/xijie/app/mykibana/kibana.yml,dst=/usr/share/kibana/config/kibana.yml -d mysql/mysql-server:5.7
二、docker-compose一起部署
参考文章:https://www.jianshu.com/p/c2f6e80b2756
1、第一步在docker上安装ELK
1.创建目录
mkdir /home/xijie/app/myelk
2.从github上拉取部署elk所需资料
$ git clone https://github.com/deviantony/docker-elk.git
下载完毕的资料目录如下:
3.进入刚下载的文件夹内
$ cd docker-elk
4.通过docker-compose创建并启动容器
$ docker-compose up -d
5.这个时候通过docker ps可以看到logstash、elasticsearch、kibana容器已经创建并且启动。
可以看该elk容器的默认端口为:
- 5000: Logstash TCP input.
- 9200: Elasticsearch HTTP
- 9300: Elasticsearch TCP transport
- 5601: Kibana
Kibana的web入口:http://localhost:5601
6.接下来进行参数配置,来实现springboot通过ELK查看日志信息。
修改logstash的配置,
进入logstash配置文件所在目录:
cd /home/xijie/app/myelk/dokcer-elk/logstash/pipeline
打开配置文件:
vim logstash.conf
修改内容如下:
input{ tcp { mode => "server" port => 5000 codec => json_lines tags => ["data-http"] } } filter{ json{ source => "message" remove_field => ["message"] } } output{ if "data-http" in [tags]{ elasticsearch{ hosts=> ["elasticsearch:9200"] index => "data-http-%{+YYYY.MM.dd}" } stdout{codec => rubydebug} } }
注:
input标签为logstash进数据接口,filter标签为数据过滤器,output为数据出去接口。
input标签使用的是tcp,说明springboot客户端需要将日志传递到该接口,该接口正是logstash服务器接口。
filter将message字段去掉,只是为了当展示springboot的http请求接口的数据更加规整,而不是全部展示在message字段中。
output标签将数据传递给了elasticsearch,这里使用了if,当判断所出数据为所指定tag,才进行下面的配置。特别要注意index的配置,该值在kibana中需要使用,这里指定的index值为:data-http,要注意该值与tag是没有关系的,要注意区分。
参考文章:https://www.elastic.co/guide/en/logstash/current/index.html
这个时候重启elk即可。
进入docker-elk目录
/home/xijie/app/myelk/docker-elk
然后重启
docker-compose restart
关于elasticsearch、logstash、kibana的配置都在对应目录下的config文件夹中的.yml文件中,只需要修改该文件即可。
2、springboot日志系统配置logstash
1.pom中配置logstash
<dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>5.2</version> </dependency>
2.application.properties文件中配置如下:
#logstash日志收集地址,即logstash服务器地址 logstash.ip_port=172.168.0.165:5000 #日志保存级别 logging.all.level=info #日志保存地址,该值与logstash没关系,是当日志存在本地File文件内的文件夹地址 logging.levelfile=/home/logs/data-center-service
3.logback.xml文件内容如下,该文件在resources目录
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"> roperty resource="application.properties">roperty> <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径--> roperty name="LOG_HOME" value="${logging.levelfile}" /> roperty name="LOG_LEVEL" value="${logging.all.level}" /> <!-- 控制台输出 --> <appender name="STDOUT" > <encoder > <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> attern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}:%L - %msg %nattern> <charset>UTF-8</charset> </encoder> </appender> <!-- 按照每天生成日志文件 --> <appender name="FILE" > <rollingPolicy > <!--日志文件输出的文件名--> <FileNamePattern>${LOG_HOME}/data-center-service.log.%d{yyyy-MM-dd}.log</FileNamePattern> <!--日志文件保留天数--> <MaxHistory>30</MaxHistory> </rollingPolicy> <encoder > <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> attern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%nattern> <charset>UTF-8</charset> </encoder> <!--日志文件最大的大小--> <triggeringPolicy > <MaxFileSize>100MB</MaxFileSize> </triggeringPolicy> </appender> <appender name="logstash" > <destination>${logstash.ip_port}</destination> <encoder charset="UTF-8" /> <queueSize>1048576</queueSize> <keepAliveDuration>5 minutes</keepAliveDuration> <!--<customFields>{"application-name":"data-repo-interface"}</customFields>--> <!--<filter > <level>INFO</level> </filter> <filter > <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –> <expression>return message.contains("billing");</expression> </evaluator> <OnMatch>ACCEPT</OnMatch> <OnMismatch>DENY</OnMismatch> </filter>--> </appender> <logger name="elk_logger" level="INFO" additivity="false"> <appender-ref ref="logstash"/> </logger> <!--<logger name="com.zaxxer" level="${LOG_LEVEL}"/>--> <!--<logger name="org.apache.ibatis" level="${LOG_LEVEL}"/>--> <!--<logger name="org.mybatis.spring" level="${LOG_LEVEL}"/>--> <!--<logger name="org.springframework" level="${LOG_LEVEL}"/>--> <!--<logger name="java.sql.Connection" level="${LOG_LEVEL}"/>--> <!--<logger name="java.sql.Statement" level="${LOG_LEVEL}"/>--> <!--<logger name="java.sql.PreparedStatement" level="${LOG_LEVEL}"/>--> <!-- 日志输出级别 --> <root level="${LOG_LEVEL}"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> <!--<appender-ref ref="logstash" />--> </root> </configuration>
在上述配置文件中
<appender name="logstash" > <destination>${logstash.ip_port}</destination> <encoder charset="UTF-8" /> <queueSize>1048576</queueSize> <keepAliveDuration>5 minutes</keepAliveDuration> <!--<customFields>{"application-name":"data-repo-interface"}</customFields>--> <!--<filter > <level>INFO</level> </filter> <filter > <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –> <expression>return message.contains("billing");</expression> </evaluator> <OnMatch>ACCEPT</OnMatch> <OnMismatch>DENY</OnMismatch> </filter>--> </appender> <logger name="elk_logger" level="INFO" additivity="false"> <appender-ref ref="logstash"/> </logger>
是logstash的主要配置。注意该配置中name=”elk_logger”,这样的指定,即
Logger elkLogger = LoggerFactory.getLogger("elk_logger");
当使用slf4j生成Logger时,只要指定Tag为“elk_logger”的日志输出或打印,都会上传到logstash服务器,并存储到elasticsearch,用kibana查看。
3、配置kibana,现在只要服务器通过指定的Tag打印日志,日志信息将会上传logstash解析,并且存储到elasticsearch,然后只需要kibana配置对应的elasticsearch的index即可看到所需的日志信息。
通过浏览器访问kibana,http://localhost:5601
由于本人是通过虚拟机部署的服务,而且虚拟机的ip为172.168.0.165,所以在宿主机中通过访问http://172.168.0.165:5601即可。
然后点击左侧的management模块。
接下来点击Index Patterns,创建index pattern
接下来填入信息,在index pattern框中填入上述创建ElK时logstash配置文件中的index信息“data-http-*”
填写的同时,下面会显示出elasticsearch中已经有该index得数据,(注意:在配置kibana时,应该先运行服务器,让服务区打印出对应的index日志,elasticsearch中也保存了该日志,这时才能配置kibana成功)。
然后点击右侧的next step。
在Time Filter field name列表中选择@timestamp,然后按下Create index pattern按钮及创建成功。
这时点击左侧的Discover模块,选中data-http-*标签即可。在右侧就显示出了日志信息。如下图:
这样整个ELK与日志系统就搭建完毕了。
4、接下讲一下我们工程中关于http接口日志的配置
由于我们提供的服务器接口是上游,是给其他部门服务的,这里会牵扯到大量的专业数据,而为了避免数据问题的纠纷与接口错误问题的排查,所以需要将特定的接口请求数据保存,这样可以通过resquest与response来排查到底是哪个部门的问题。
1.使用了AOP对每次请求进行日志拦截。
定义SysLogAspect类,该类内容如下:
package com; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.SpringContextUtil; import com.LoggerEntity; import com.MailService; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; import java.util.Date; import java.util.HashMap; import java.util.Map; @Aspect @Component @Slf4j public class SysLogAspect { Logger elkLogger = LoggerFactory.getLogger("elk_logger"); /** * 开始时间 */ private long startTime = 0L; /** * 结束时间 */ private long endTime = 0L; // @Autowired // private ILogService logService; public static Map<String, Object> getKeyAndValue(Object obj) { Map<String, Object> map = new HashMap<>(); // 得到类对象 Class userCla = (Class) obj.getClass(); /* 得到类中的所有属性集合 */ Field[] fs = userCla.getFields(); for (int i = 0; i < fs.length; i++) { Field f = fs[i]; f.setAccessible(true); // 设置些属性是可以访问的 Object val = new Object(); try { val = f.get(obj); // 得到此属性的值 map.put(f.getName(), val);// 设置键值 } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } return map; } //通过该注解来判断该接口请求是否进行cut @Pointcut("@annotation(com.hongdaoai.datastore.common.annotation.SysLog)") public void cutController() { } @Before("cutController()") public void doBeforeInServiceLayer(JoinPoint joinPoint) { log.debug("doBeforeInServiceLayer"); startTime = System.currentTimeMillis(); } @After("cutController()") public void doAfterInServiceLayer(JoinPoint joinPoint) { log.debug("doAfterInServiceLayer"); } @Around("cutController()") public Object recordSysLog(ProceedingJoinPoint joinPoint) throws Throwable { RequestAttributes ra = RequestContextHolder.getRequestAttributes(); ServletRequestAttributes sra = (ServletRequestAttributes) ra; HttpServletRequest request = sra.getRequest(); HttpServletResponse response = sra.getResponse(); //ELK日志实体类 LoggerEntity elkLog = new LoggerEntity(); //应用程序名称 elkLog.setApplicationName(SpringContextUtil.getApplicationName()); //profile.active elkLog.setProfileActive(SpringContextUtil.getActiveProfile()); // 请求的类名 elkLog.setClassName(joinPoint.getTarget().getClass().getName()); // 请求的方法名 elkLog.setMethodName(joinPoint.getSignature().getName()); //请求完整地址 elkLog.setUrl(request.getRequestURL().toString()); //请求URI elkLog.setUri(request.getRequestURI()); //请求类型 elkLog.setRequestMethod(request.getMethod()); String queryString = request.getQueryString(); Object[] args = joinPoint.getArgs(); String params = ""; //获取请求参数集合并进行遍历拼接 if (args.length > 0) { if ("POST".equals(request.getMethod())) { //param Object object = args[0]; Map<String, Object> map = new HashMap<>(); Map paramMap = getKeyAndValue(object); map.put("param", paramMap); if (args.length > 1) { object = args[1]; Map bodyMap = getKeyAndValue(object); map.put("body", bodyMap); } params = JSON.toJSONStringWithDateFormat(map, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes); } else if ("GET".equals(request.getMethod())) { params = queryString; } } //请求参数内容(param) elkLog.setRequestParamData(params); //客户端IP elkLog.setClientIp(request.getRemoteAddr()); //终端请求方式,普通请求,ajax请求 elkLog.setRequestType(request.getHeader("X-Requested-With")); //sessionId elkLog.setSessionId(request.getRequestedSessionId()); //请求时间 // elkLog.setRequestDateTime(new Date(startTime)); elkLog.setRequestDateTime(new Date()); Object result = null; try { // 环绕通知 ProceedingJoinPoint执行proceed方法的作用是让目标方法执行,这也是环绕通知和前置、后置通知方法的一个最大区别。 result = joinPoint.proceed(); } catch (Exception e) { endTime = System.currentTimeMillis(); //请求耗时(单位:毫秒) elkLog.setSpentTime(endTime - startTime); //接口返回时间 elkLog.setResponseDateTime(new Date(endTime)); //请求时httpStatusCode代码 elkLog.setHttpStatusCode(String.valueOf(response.getStatus())); String elkLogData = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS"); elkLogger.error(elkLogData); // log.error(e.getMessage(), e); throw e; // return result; } endTime = System.currentTimeMillis(); //请求耗时(单位:毫秒) elkLog.setSpentTime(endTime - startTime); if (!elkLog.getMethodName().equals("getBookOriginalContent")) { //接口返回数据 elkLog.setResponseData(JSON.toJSONStringWithDateFormat(result, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes)); } //接口返回时间 elkLog.setResponseDateTime(new Date(endTime)); //请求时httpStatusCode代码 elkLog.setHttpStatusCode(String.valueOf(response.getStatus())); // if (SpringContextUtil.getActiveProfile().equals("prod")) { String s = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS"); elkLogger.info(s); return result; } }
SpringContextUtil类内容如下:
package com; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Locale; @Component public class SpringContextUtil implements ApplicationContextAware { private static ApplicationContext context = null; // 传入线程中 public static <T> T getBean(String beanName) { return (T) context.getBean(beanName); } // 国际化使用 public static String getMessage(String key) { return context.getMessage(key, null, Locale.getDefault()); } /// 获取应用程序名称 public static String getApplicationName() { return context.getEnvironment().getProperty("spring.application.name"); } /// 获取当前环境 public static String getActiveProfile() { return context.getEnvironment().getActiveProfiles()[0]; } /* (non Javadoc) * @Title: setApplicationContext * @Description: spring获取bean工具类 * @param applicationContext * @throws BeansException * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; } }
LoggerEntity类内容如下:
package com; import lombok.Data; import java.io.Serializable; import java.util.Date; //@Entity //@Table(name = "t_logger_infos") @Data public class LoggerEntity implements Serializable { //应用程序名 private String applicationName; //spring.profiles.active private String profileActive; //客户端请求ip private String clientIp; //客户端请求路径 private String uri; //客户端请求完整路径 private String url; //请求方法名 private String methodName; //请求类名 private String className; //终端请求方式,普通请求,ajax请求 private String requestType; //请求方式method,post,get等 private String requestMethod; //请求参数内容,json private String requestParamData; //请求body参数内容,json private String requestBodyData; //请求接口唯一session标识 private String sessionId; //请求时间 private Date requestDateTime; //接口返回时间 private Date responseDateTime; //接口返回数据json private String responseData; //请求时httpStatusCode代码,如:200,400,404等 private String httpStatusCode; //请求耗时秒单位 private long spentTime; }
SysLog注解类如下:
package com; import java.lang.annotation.*; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface SysLog { boolean isLog() default true; }
在SysLogAspect切面类中使用@SysLog注解来判断该接口是否进行日志传递。
所以在我们写的接口中只要在方法上加入该注解,即可控制日志是否上传。
如该Controller中所定义接口:
@SysLog(isLog = true) @RequestMapping(value = "/getDataList", method = RequestMethod.POST) public ApiResult getInputDataList(@RequestBody VersionVo vo) { return success(); }
原文出处:csdn -> https://blog.csdn.net/zhizhuodewo6/article/details/86630712