RocketMQ4.5.X 实现修改生产者消费者日志保存路径

RocketMQ修改生产者消费者日志保存路径

rocket默认是将所有日志文件保存到user.home的对于win系统就是C盘了。

1.修改RocketMQ中CLientLogger.class的源码,把经过更改的源码重新打包后,去自己的maven仓库替换rocketmq-client.-4.5.X.jar。

2.对于生产者的启动类里需要配置JVM系统属性:

如果不设置logUserSlf4j为true的话,启动生产者的时候会报找不到日志配置文件的警告。

3.对于消费者,仅仅添加rq.lordir(日志保存路径)就行了。

当然,以上情况是开发中每个开发人员需要独立设置的,部署到线上的时候可以直接在源码中写路径而不是系统属性。

RocketMQ 日志操作

官网:http://rocketmq.apache.org/docs/logappender-example/

应用:将程序日志输出到rocketmq,消费端可读取日志数据进行相应处理

导入 jar 包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-logappender</artifactId>
    <version>4.7.1</version>
</dependency>

相关类

RocketmqLogbackAppender:发送到rocketmq

public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
    private String tag;                   //标签
    private String topic;                 //发送的topic
    private String nameServerAddress;     //namesrv地址
    private String producerGroup;         //发送群组
    private MQProducer producer;
    private Layout layout;                //布局方式
    private PreSerializationTransformer<ILoggingEvent> pst = new LoggingEventPreSerializationTransformer();

AsyncAppender:异步发送

public class AsyncAppender extends AsyncAppenderBase<ILoggingEvent> {
    boolean includeCallerData = false;

AsyncAppenderBase

public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E>
implements AppenderAttachable<E> {
    AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
    BlockingQueue<E> blockingQueue;
    public static final int DEFAULT_QUEUE_SIZE = 256;
    int queueSize = DEFAULT_QUEUE_SIZE;
    int appenderCount = 0;
    static final int UNDEFINED = -1;
    int discardingThreshold = UNDEFINED;
    boolean neverBlock = false;
    Worker worker = new Worker();
    public static final int DEFAULT_MAX_FLUSH_TIME = 1000;
    int maxFlushTime = DEFAULT_MAX_FLUSH_TIME;

AppenderAttachable:引用、删除appender

public interface AppenderAttachable<E> {
    void addAppender(Appender<E> newAppender);
    boolean isAttached(Appender<E> appender);

    Iterator<Appender<E>> iteratorForAppenders();
    Appender<E> getAppender(String name);

    void detachAndStopAllAppenders();
    boolean detachAppender(Appender<E> appender);
    boolean detachAppender(String name);
}

日志配置

logback.xml:放在resources目录下

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>

    <property name="log.path" value="e:/hhhh/"/>
    <property name="console.pattern" value="%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
    <property name="file.pattern" value="%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } --- [%t] %-40.40logger{39} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${console.pattern}</pattern>
        </encoder>
    </appender>

    <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
        <tag>tag</tag>
        <topic>topic-log</topic>
        <producerGroup>log-group</producerGroup>
        <nameServerAddress>172.18.0.10:9876</nameServerAddress>
        <layout>
            <pattern>%date %p %t - %m%n</pattern>
        </layout>
    </appender>

    <!-- 异步发送 -->
    <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
        <queueSize>1024</queueSize>
        <discardingThreshold>80</discardingThreshold>
        <maxFlushTime>2000</maxFlushTime>
        <neverBlock>true</neverBlock>
        <appender-ref ref="mqAppender1"/>
    </appender>

    <root level="info">
        <appender-ref ref="console"/>
        <appender-ref ref="mqAsyncAppender1"/>
    </root>
</configuration>

将springboot应用打包,发布成docker容器

创建容器

#namesrv
docker run -it -d --net fixed --ip 172.18.0.10 -p 9876:9876 \
-e JAVA_OPT="-server -Xms256m -Xmx256m -Xmn128m" \
--name namesrv lihu12344/rocketmq:4.7.1 bash bin/mqnamesrv

#broker
docker run -it -d --net fixed --ip 172.18.0.20 \
-e NAMESRV_ADDR="172.18.0.10:9876" \
-e JAVA_OPT="-server -Xms512m -Xmx512m -Xmn256m" \
-v /usr/rocketmq/single/broker.conf:/home/rocketmq/rcketmq-4.7.1/conf/broker.conf \
--name broker lihu12344/rocketmq:4.7.1 \
bash bin/mqbroker autoCreateTopicEnable=true -c conf/broker.conf

#应用程序
docker run -it -d --net fixed --ip 172.18.0.21 -p 8080:8080 --name rocketmq-log rocketmq-log

rocketmq监控

docker run -it -d --net fixed --ip 172.18.0.4 -p 8008:8080 \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.18.0.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
--name rocketmq-console styletang/rocketmq-console-ng

使用测试

192.168.57.127:8008

查看topic

查看topic-log message信息

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • RocketMQ-延迟消息的处理流程介绍

    概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息: 预设值的延迟时间间隔为: 1s. 5s. 10s. 30s. 1m. 2m. 3m. 4m. 5m. 6m. 7m. 8m. 9m. 10m. 20m. 30m. 1h. 2h: 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间: broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到

  • 基于RocketMQ推拉模式详解

    消费者客户端有两种方式从消息中间件获取消息并消费.严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现. 通过 Pull 不断轮询 Broker 获取消息.当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息. 1.概述 1.1.PULL方式 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息:采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个

  • RocketMQ4.5.2 修改mqnamesrv 和 mqbroker的日志路径操作

    此解决方案是针对window的,因为日志默认保存路径在C盘,linux忽略. 学习RocketMQ过程中,总是出现 com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small. 这

  • RocketMQ存储文件的实现

    RocketMQ存储路径默认是${ROCKRTMQ_HOME}/store,主要存储消息.主题对应的消息队列的索引等. 1.概述 查看其目录文件 commitlog:消息的存储目录 config:运行期间一些配置信息 consumequeue:消息消费队列存储目录 index:消息索引文件存储目录 abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除 checkpoint:文件检测点.存储commitlog文件最后一次刷盘时间戳.consumeque

  • RocketMQ 延时级别配置方式

    RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等. 其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推. 如何配置: 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行: messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 描述了各级别与延时时

  • 解决rocketmq-client日志保存路径的问题

    rocketmq-client日志保存路径 由于使用rocketmq-client会生成一个rocketmq_client.log日志文件,一般默认存于系统盘内,且持续增长速度较快,为便于清理且避免影响操作系统资源使用,建议手动设置其日志存储路径为当前程序运行目录下的logs文件夹. 参考操作方式: 系统启动时代码显式设置全局变量"rocketmq.client.logRoot", 示例: System.setProperty("rocketmq.client.logRoot

  • rocketmq如何修改存储路径

    一.下载rocketmq对应版本源码 修改消息存储路径需要修改rocketmq源码,因为rocketmq取的默认路径是user.home路径,也就是用户的根目录,如下所示 直接修改用户的user.home比较麻烦,我们打算直接修改源码里写死的路径,然后重新打包 下载rocketmq源码可以去GitHub,路径为https://github.com/apache/rocketmq 如果要下4.7.1版本的源码包可以选择对应release包 例如使用的rocketmq版本为4.7.1,则下载路径为h

  • RocketMQ4.5.X 实现修改生产者消费者日志保存路径

    RocketMQ修改生产者消费者日志保存路径 rocket默认是将所有日志文件保存到user.home的对于win系统就是C盘了. 1.修改RocketMQ中CLientLogger.class的源码,把经过更改的源码重新打包后,去自己的maven仓库替换rocketmq-client.-4.5.X.jar. 2.对于生产者的启动类里需要配置JVM系统属性: 如果不设置logUserSlf4j为true的话,启动生产者的时候会报找不到日志配置文件的警告. 3.对于消费者,仅仅添加rq.lordi

  • Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解

    前言 前面的例子都是多个线程在做相同的操作,比如4个线程都对共享数据做tickets–操作.大多情况下,程序中需要不同的线程做不同的事,比如一个线程对共享变量做tickets++操作,另一个线程对共享变量做tickets–操作,这就是大名鼎鼎的生产者和消费者模式. 正文 一,生产者-消费者模式也是多线程 生产者和消费者模式也是多线程的范例.所以其编程需要遵循多线程的规矩. 首先,既然是多线程,就必然要使用同步.上回说到,synchronized关键字在修饰函数的时候,使用的是"this"

  • 理解生产者消费者模型及在Python编程中的运用实例

    什么是生产者消费者模型 在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产 生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型.结构图如下: 生产者消费者模型的优点: 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,

  • java解决单缓冲生产者消费者问题示例

    经典的生产者消费者问题模拟.此程序模拟最简单情形--单缓冲.为模拟实际情况,consume item和produce item时加了延时,可以通过修改延时模拟不同的生成消费速率. [code] [/co/** * single buffer consumer-producer problem. * by xu(xusiwei1236@163.com). * */public class ConsumerProducer { static Object buffer = null; static

  • Java实现生产者消费者问题与读者写者问题详解

    1.生产者消费者问题 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步:(2)在生产者和消费者之间建立一个管道.第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式.第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强. 同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性.常

  • Python semaphore evevt生产者消费者模型原理解析

    线程锁相当于同时只能有一个线程申请锁,有的场景无数据修改互斥要求可以同时让多个线程同时运行,且需要限制并发线程数量时可以使用信号量 import threading, time, queue def test(name): semaphore.acquire() #获取信号量锁 print('my name is %s' %name) time.sleep(1) semaphore.release() #释放信号量锁 semaphore = threading.BoundedSemaphore(

  • Java中生产者消费者问题总结

    生产者-消费者算是并发编程中常见的问题.依靠缓冲区我们可以实现生产者与消费者之间的解耦.生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西.这样我们避免生产者想要交付数据给消费者,但消费者此时还无法接受数据这样的情况发生. wait notify 这个问题其实就是线程间的通讯,所以要注意的是不能同时读写.生产者在缓冲区满的时候不生产,等待:消费者在缓冲区为空的时候不消费,等待.比较经典的做法是wait和notify. 生产者线程执行15次set操作 public class Produc

  • Go语言实现一个简单生产者消费者模型

    目录 一.生产者消费者模型 二.Go语言实现 1.无缓冲channel 2.有缓冲channel 三.实际应用 简介:介绍生产者消费者模型,及go简单实现的demo. 一.生产者消费者模型 生产者消费者模型:某个模块(函数等〉负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.协程.线程.进程等).产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者. 单单抽象出生产者和消费者,还够不上是生产者消费者模型.该模式还需要有一个缓冲区处于生产者和消费者之间

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

  • JAVA生产者消费者(线程同步)代码学习示例

    一.问题描述 生产者消费者问题是一个典型的线程同步问题.生产者生产商品放到容器中,容器有一定的容量(只能顺序放,先放后拿),消费者消费商品,当容器满了后,生产者等待,当容器为空时,消费者等待.当生产者将商品放入容器后,通知消费者:当消费者拿走商品后,通知生产者. 二.解决方案 对容器资源加锁,当取得锁后,才能对互斥资源进行操作. 复制代码 代码如下: public class ProducerConsumerTest { public static void main(String []args

随机推荐