RocketMq深入分析讲解两种削峰方式

目录
  • 何时需要削峰
  • 通过消息队列的削峰方法有两种
  • 消费延时控流
  • 总结

何时需要削峰

当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求

通过消息队列的削峰方法有两种

控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度

通过消费者参数控制消费速度

先分析那些参数对控制消费速度有作用

1.PullInterval: 设置消费端,拉取mq消息的间隔时间。

注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息

源码分析:

首先需要了解rocketMq的消息拉去过程

拉去消息的类

PullMessageService

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
    private final MQClientInstance mQClientFactory;
    private final ScheduledExecutorService scheduledExecutorService = Executors
    .newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "PullMessageServiceScheduledThread");
        }
    });
    public PullMessageService(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }
    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }
    public void executeTaskLater(final Runnable r, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }
    public ScheduledExecutorService getScheduledExecutorService() {
        return scheduledExecutorService;
    }
    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }
    @Override
    public void shutdown(boolean interrupt) {
        super.shutdown(interrupt);
                       ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
                       }
                       @Override
                       public String getServiceName() {
                       return PullMessageService.class.getSimpleName();
                       }
                       }

继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。

executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。

那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调

 PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }

2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数

消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。

消费端每次pull到消息总数=PullBatchSize*监听队列数

源码分析

消费者拉取消息时

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中

会执行

 this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );

其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。

3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。

源码分析:

还是在消费者拉取消息成功时

  boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

通过consumeMessageService执行

默认情况下是并发消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

  @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

其中consumeExecutor初始化

this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。

以上三种情况:是针对参数配置,来调整消费速度。

除了这三种情况外还有两种服务部署情况,可以调整消费速度:

4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数

可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量

5.消费端节点部署数量 :

部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。

消费延时控流

针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:

单节点部署:

ConsumInterval :延时时间单位毫秒

ConcurrentThreadNumber:消费端线程数量

MaxRate :理论每秒处理数量

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理论上可以将并发消费控制在 200 以下

如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。

如下延时流控代码:

 /**
     * 测试mq 并发 接受
     */
    @Component
    @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
    class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
        @SneakyThrows
        @Override
        public void onMessage(LikeWritingParams params) {
            System.out.println("睡上0.1秒");
            Thread.sleep(100);
            long begin = System.currentTimeMillis();
            System.out.println("mq消费速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
            //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
            long end = System.currentTimeMillis();
          //  System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
        }
        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
            defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
            defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
            defaultMQPushConsumer.setPullBatchSize(32);     //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
        }
    }

注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。

但是要注意实际操作中并发流控实际是默认存在的,

spring boot 消费端默认配置

this.consumeThreadMin = 20;

this.consumeThreadMax = 20;

this.pullInterval = 0L;

this.pullBatchSize = 32;

若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000

这里默认拉去的速度1s内远大于1000

注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000

所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义

使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。

总结

rocketMq 肖锋流控两种方式:

并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内

延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制

到此这篇关于RocketMq深入分析讲解两种削峰方式的文章就介绍到这了,更多相关RocketMq削峰内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 一文彻底掌握RocketMQ 的存储模型

    目录 RocketMQ简介 1 整体概览 2 数据文件 3 消费文件 4 索引文件 5 写到最后 RocketMQ简介 RocketMQ有Producer.Consumer.NameSrv.Broker四个部分.其中Broker用于存储消息,维护消息队列和订阅关系,是RocketMQ四个部分中最重要的一个部分,并且RocketMQ的高性能就是依赖于Broker模块的底层存储模型实现的.所以搞清楚Broker的存储模型是学习RocketMQ最重要的一步. RocketMQ 优异的性能表现,必然绕不

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • 图文并茂讲解RocketMQ消息类别

    目录 1.同步消息 2.异步消息 3.单向消息 1.同步消息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功) 生产者: public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("19

  • RocketMq深入分析讲解两种削峰方式

    目录 何时需要削峰 通过消息队列的削峰方法有两种 消费延时控流 总结 何时需要削峰 当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求 通过消息队列的削峰方法有两种 控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度 通过消费者参数控制消费速度 先分析那些参数对控制消费速度有作用 1.PullInterval: 设置消费端,拉取mq消息的间隔时间. 注意:该时间算起时间是rocketMq消费者从broker消息后算起.经过PullInt

  • 浅谈Spring的两种事务定义方式

    一.声明式 这种方法不需要对原有的业务做任何修改,通过在XML文件中定义需要拦截方法的匹配即可完成配置,要求是,业务处理中的方法的命名要有规律,比如setXxx,xxxUpdate等等.详细配置如下: <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="

  • 浅谈Java的两种多线程实现方式

    本文介绍了浅谈Java的两种多线程实现方式,分享给大家.具有如下: 一.创建多线程的两种方式 Java中,有两种方式可以创建多线程: 1 通过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中 2 通过实现Runnable接口,实例化Thread类 在实际应用中,我们经常用到多线程,如车站的售票系统,车站的各个售票口相当于各个线程.当我们做这个系统的时候可能会想到两种方式来实现,继承Thread类或实现Runnable接口,现在看一下这两种方式实现的两种结果. 程序1

  • 详解IOS开发中图片上传时两种图片压缩方式的比较

    IOS 图片上传时两种图片压缩方式的比较 上传图片不全面的想法:把图片保存到本地,然后把图片的路径上传到服务器,最后又由服务器把路径返回,这种方式不具有扩展性,如果用户换了手机,那么新手机的沙盒中就没有服务器返回的图片路径了,此时就无法获取之前已经上传了的头像了,在项目中明显的不可行. 上传图片的正确方式:上传头像到服务器一般是将图片NSData上传到服务器,服务器返回一个图片NSString地址,之后再将NSString的路径转为url并通过url请求去更新用户头像(用户头像此时更新的便是NS

  • PHP守护进程的两种常见实现方式详解

    本文实例讲述了PHP守护进程的两种常见实现方式.分享给大家供大家参考,具体如下: 第一种方式,借助 nohup 和 &  配合使用. 在命令后面加上 & 符号, 可以让启动的进程转到后台运行,而不占用控制台,控制台还可以再运行其他命令,这里我使用一个while死循环来做演示,代码如下 <?php while(true){ echo time().PHP_EOL; sleep(3); } 用 & 方式来启动该进程 [root@localhost php]# php deadlo

  • BootStrap的两种模态框方式

    bootstrap的弹出层 bootstrap弹出层有多种触发方式,以下是我用到的几种方式: 1.方法一:button中属性触发 注意:button中的data-target内容应该和要和弹出层中的id保持一致 data-target="#mymodal-data"--– id="mymodal-data" <!--在button上绑定触发弹出层的属性--> <button class="btn btn-primary delete&qu

  • 详解Spring 两种注入的方式(Set和构造)实例

    依赖注入是指对象之间关系的控制权由应用代码中转到外部容器.Spring框架主要提供了Set注入和构造注入两种依赖注入方式. 1:Set注入指的就是在接受注入的类中定义一个要被注入的类型的一个set方法,并在参数中定义需要注入的元素.Set注入式一种装配Bean属性的直接方法,但Set注入的一个缺点就是它假设了所有的可变属性都可以通过set方法访问到,无法清晰地表示哪些属性是必须的,哪些属性是可选的. 2:构造注入是在接收注入的类中定义一个构造方法,并在构造方法中定义需要注入的参数.构造注入方式的

  • Nginx使用的php-fpm的两种进程管理方式及优化

    PS:前段时间配置php-fpm的时候,无意中发现原来它还有两种进程管理方式.与Apache类似,它的进程数也是可以根据设置分为动态和静态的. php-fpm目前主要又两个分支,分别对应于php-5.2.x的版本和php-5.3.x的版本.在5.2.x的版本中,php-fpm.conf使用的是xml格式,而在新的5.3.x版本中,则是和php.ini一样的配置风格. 在5.2.x版本中,php-fpm.conf中对于进程管理号称是有两种风格,一种是静态(static)的,一种是类似于apache

  • PHP实现两种排课方式

    两种排课方式: 固定每周的固定时间上课(例:共上20节,每周六.周日早上8点-10点上课.假如今天周六凌晨1点,那么排课也需要从今天开始)总共上几个周,每周上课时间比较个性化(例:共上三周,第一周周一周二早上8点-10点上课:第二周周三周四下午8点-10点上课:第三周周日中午11点-12点上课.) 第一种排课比较好实现,简要代码如下: /** * 生成日期列表 * * @param int $startDate 开始日期 时间戳格式 * @param array $timeList 课时计划列表

  • C++中两种字符串定义方式和区别介绍

    目录 前言 概念 定义方式 计算机的存储方式 总结 前言 在学习字符串这方面的内容时,发现字符串定义的两种方式虽然内容相同但还是有细微的区别 概念 字符串就是一串用双引号连接起来的字符串字面值,简称为字符串 定义方式 上代码 char acter1[]="Hello world";//第一种定义方式 char acter2[]={'H','e','l','l','o','w','o','r','l','d'}; 第二种定义方式 此时并不能看出这两种定义方式的区别,拿到VS调试器看看 很

随机推荐