原文出处:WebRTC Pacing之IntervalBudget分析

WebRTC中Pacing等模块需要按照指定的码率发送报文,保证码率稳定,会用到IntervalBudget这个类,这个类是控制码率平稳的核心。本篇将介绍IntervalBudget这个类。

1. IntervalBudget原理

IntervalBudget顾名思义,就是一段时间内的发送码率预算。 IntervalBudget根据时间流逝增加budget,报文发送后减少budget,每次发送报文前判断剩余budget是否足够,如果不足则取消本次发送。

举个例子 :

IntervalBudget这个类比较小,因此这里直接贴上其声明:

class IntervalBudget {
 public:
  explicit IntervalBudget(int initial_target_rate_kbps);
  IntervalBudget(int initial_target_rate_kbps, bool can_build_up_underuse);
  // 设置目标发送码率
  void set_target_rate_kbps(int target_rate_kbps);

  // 时间流逝后增加budget
  void IncreaseBudget(int64_t delta_time_ms);
  // 发送数据后减少budget
  void UseBudget(size_t bytes);

  // 剩余budget
  size_t bytes_remaining() const;
  // 剩余budget占当前窗口数据量比例
  double budget_ratio() const;
  // 目标发送码率
  int target_rate_kbps() const;

 private:
  // 设置的目标码率,按照这个码率控制数据发送
  int target_rate_kbps_;
  // 窗口内(500ms)对应的最大字节数=窗口大小*target_rate_kbps_/8
  int64_t max_bytes_in_budget_;
  // 剩余可发送字节数,限制范围:[-max_bytes_in_budget_, max_bytes_in_budget_]
  int64_t bytes_remaining_;
  // 上个周期underuse,本周期是否可以借用上个周期的剩余量
  bool can_build_up_underuse_;
};

2. budget增加

如果距离上次更新时间相隔delta_time_ms,那么随着时间流逝,那么这段时间增长的budget为delta_time_ms * target_rate_kbps_

void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {
  int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
  // 一般来说,can_build_up_underuse_都会关闭,关于这个开关的介绍见最后一部分介绍
  if (bytes_remaining_ < 0 || can_build_up_underuse_) {
    // We overused last interval, compensate this interval.
    // 如果上次发送的过多(bytes_remaining_ < 0),那么本次发送的数据量会变少
    // 如果开启can_build_up_underuse_,则表明可以累积之前没有用完的预算
    bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);
  } else {
    // If we underused last interval we can't use it this interval.
    // 1) 如果上次的budget没有用完(bytes_remaining_ > 0),如果没有设置can_build_up_underuse_
    // 不会对上次的补偿,直接清空所有预算,开始新的一轮

    // 2) 如果设置了can_build_up_underuse_标志,那意味着要考虑上次的underuse,
    // 如果上次没有发送完,则本次需要补偿,见上面if逻辑
    bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);
  }
}

3. 减少budget

发送了数据后需要减少budget,直接减去发送字节数即可:

void IntervalBudget::UseBudget(size_t bytes) {
  bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
                              -max_bytes_in_budget_);
}

4. 谈谈IntervalBudget窗口大小

这里的窗口目前主要是用来控制can_build_up_underuse_开关打开下,build_up的上限。 考虑到存在这样的情况,随着时间流逝,我们每5ms都有5ms的budget可供使用,但是并不是每一个5ms我们都能够完全使用掉这5ms的budget,这里称作underuse。因此,can_build _up_underuse_开关允许我们将这些没有用完的预算累计起来,以供后续使用。kWindowMs = 500ms意味着我们可以累积500ms这么多没有用完的预算。

打开这个开关好处在于某些时刻,我们短时间内可供发送的预算更多,在码率抖动较大的时候,我们可以更快地将数据发送出去。带来的缺陷是,短时间内的码率控制不够平滑, 在一些低带宽场景影响更大。


原文出处:WebRTC Pacing之RoundRobinPacketQueue分析

WebRTC中Pacing模块实现了一个优先级队列RoundRobinPacketQueue,它可以push不同优先级的报文,然后可以按照优先级pop并发送出去。RoundRobinPacketQueue总结来说有几点需要注意:

  1. RoundRobinPacketQueue每一路Stream单独调度,每一路stream有不同优先级,由其packet决定,优先级相同时,发送数据量少的stream有更高的调度优先级
  2. Stream的优先级保存在RoundRobinPacketQueue中,每次发送报文后都需要更新
  3. 同一个Stream的报文保存在一个优先级队列中,队列中的数据重传包有更高优先级,先插入的数据有更高优先级
  4. 音频报可能不参与调度

1. 对外接口以及相关定义

老规矩,了解一个类的功能前,我们先了解其对外接口:

class RoundRobinPacketQueue {
 public:
  ...
  // 插入不同优先级的报文
  void Push(int priority,
            Timestamp enqueue_time,
            uint64_t enqueue_order,
            std::unique_ptr<RtpPacketToSend> packet);
  // 弹出一个即将发送的报文
  std::unique_ptr<RtpPacketToSend> Pop();
  // queue是否empty
  bool Empty() const;
  // 报文总数
  size_t SizeInPackets() const;
  // 报文总字节数
  DataSize Size() const;
  // 下一个pop的报文如果是音频包,则返回该包时间戳,非音频包则返回nullopt
  absl::optional<Timestamp> LeadingAudioPacketEnqueueTime() const;

  // 队列中最旧的报文时间戳
  Timestamp OldestEnqueueTime() const;
  // 队列中的报文按照当前码率发送出去所用到的时间
  TimeDelta AverageQueueTime() const;
  // 更新内部状态
  void UpdateQueueTime(Timestamp now);
  // 暂停queue处理
  void SetPauseState(bool paused, Timestamp now);
  // 计算是考虑传输层overhead,默认不考虑
  void SetIncludeOverhead();
  // 设置传输层的overhead大小
  void SetTransportOverhead(DataSize overhead_per_packet);
  ...
};

最重要的接口应该数push和pop,从接口上我们大致了解了其功能,但是实际上其内部处理还是要深入到这个函数里面。

Stream

RoundRobinPacketQueue 中定义了Stream来区分不同的流,每一路有自己的一个优先级队列,以及这路流对应的优先级:

struct Stream {
  ...
  // 已经发送的报文总大小,发送较少的stream通常会优先调度
  DataSize size;
  // 该路流对应的ssrc
  uint32_t ssrc;
  // 一个根据StreamPrioKey确定优先级的优先队列
  PriorityPacketQueue packet_queue;

  // stream_priorities_的一个iterator,可以理解成一个指针
  std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
};

StreamPrioKey

Stream之间优先级比较主要是根据优先级以及stream中报文总大小,因此这里增加了一个StreamPrioKey来简化优先级比较:

struct StreamPrioKey {
  StreamPrioKey(int priority, DataSize size)
      : priority(priority), size(size) {}

  // 优先级不等时比较优先级,优先级相等时,发送过较少的stream优先级更高
  bool operator<(const StreamPrioKey& other) const {
    if (priority != other.priority)
      return priority < other.priority;
    return size < other.size;
  }

  const int priority;
  const DataSize size;
};

PriorityPacketQueue

PriorityPacketQueue是一个继承于std::priority_queue<QueuedPacket>的优先队列,不同stream的数据最终是保存在该优先队列中,建议大家先看看标准库里面的优先队列std::priority_queue的操作。 该优先队列内的数据比较方法:

bool RoundRobinPacketQueue::QueuedPacket::operator<(
    const RoundRobinPacketQueue::QueuedPacket& other) const {
  // 先判断优先级,优先级相等时判断重传,重传包更高优先级,否则判断插入顺序(order)
  if (priority_ != other.priority_)
    return priority_ > other.priority_;
  if (is_retransmission_ != other.is_retransmission_)
    return other.is_retransmission_;

  return enqueue_order_ > other.enqueue_order_;
}

最后看看RoundRobinPacketQueue几个重要的成员变量:

// StreamPrioKey-ssrc map,Stream中有保存该iterator
std::multimap<StreamPrioKey, uint32_t> stream_priorities_;

// SSRC Stream map
std::map<uint32_t, Stream> streams_;

// 报文入队时间,用于找到最老的报文时间
std::multiset<Timestamp> enqueue_times_;

// 存一个报文的queue
absl::optional<QueuedPacket> single_packet_queue_;

2. Push数据

RoundRobinPacketQueue对插入的数据稍微做了一下封装,QueuedPacket增加了优先级、入队列时间、次序、是否是重传、入队时间set的iterator,数据存在

void RoundRobinPacketQueue::Push(int priority,
                                 Timestamp enqueue_time,
                                 uint64_t enqueue_order,
                                 std::unique_ptr<RtpPacketToSend> packet) {
  // 没有存储任何报文时直接存在single_packet_queue_中,不进队列
  if (size_packets_ == 0) {
    // Single packet fast-path.
    single_packet_queue_.emplace(
        QueuedPacket(priority, enqueue_time, enqueue_order,
                     enqueue_times_.end(), std::move(packet)));
    UpdateQueueTime(enqueue_time);
    single_packet_queue_->SubtractPauseTime(pause_time_sum_);
    size_packets_ = 1;
    size_ += PacketSize(*single_packet_queue_);
  } else {
    // 如果single_packet_queue_有数据,先push到queue中
    MaybePromoteSinglePacketToNormalQueue();
    // 调用另外一个Push函数插入到队列中
    Push(QueuedPacket(priority, enqueue_time, enqueue_order,
                      enqueue_times_.insert(enqueue_time), std::move(packet)));
  }
}

Push插入数据逻辑总结来说是:

void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  // 根据报文ssrc找到对应的stream,没有找到则创建一个新的stream
  auto stream_info_it = streams_.find(packet.Ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
    // 暂时没有确定该stream的优先级,即没有被调度
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.Ssrc();
  }

  Stream* stream = &stream_info_it->second;

  // 如果该stream没有被调度,则加入到stream_priorities_中
  if (stream->priority_it == stream_priorities_.end()) {
    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  } else if (packet.Priority() < stream->priority_it->first.priority) {
    // 当前报文优先级比之前的小,说明优先级有变化,需要更新其优先级
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  }
  RTC_CHECK(stream->priority_it != stream_priorities_.end());

  // 还没有入队时间,说明是从single_packe_queue_中提升过来的
  // 注意packet只保存了一个enqueue_times_的iterator
  if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
    // Promotion from single-packet queue. Just add to enqueue times.
    packet.UpdateEnqueueTimeIterator(
        enqueue_times_.insert(packet.EnqueueTime()));
  } else {
    // In order to figure out how much time a packet has spent in the queue
    // while not in a paused state, we subtract the total amount of time the
    // queue has been paused so far, and when the packet is popped we subtract
    // the total amount of time the queue has been paused at that moment. This
    // way we subtract the total amount of time the packet has spent in the
    // queue while in a paused state.
    UpdateQueueTime(packet.EnqueueTime());
    packet.SubtractPauseTime(pause_time_sum_);

    size_packets_ += 1;
    size_ += PacketSize(packet);
  }

  // 插入到stream的queue中
  stream->packet_queue.push(packet);
}

3. pop数据

Pop数据的逻辑大致如下:

std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
  // 如果single_packe_queue_中有数据,则直接从single_packet_queue_中返回数据
  if (single_packet_queue_.has_value()) {
    RTC_DCHECK(stream_priorities_.empty());
    std::unique_ptr<RtpPacketToSend> rtp_packet(
        single_packet_queue_->RtpPacket());
    single_packet_queue_.reset();
    queue_time_sum_ = TimeDelta::Zero();
    size_packets_ = 0;
    size_ = DataSize::Zero();
    return rtp_packet;
  }

  RTC_DCHECK(!Empty());
  // 每次都是返回优先级最高的stream的报文
  Stream* stream = GetHighestPriorityStream();
  const QueuedPacket& queued_packet = stream->packet_queue.top();

  // 因为有弹出报文,优先级需要更新,更新部分见最后
  stream_priorities_.erase(stream->priority_it);

  // 计算报文在queue中的时间
  TimeDelta time_in_non_paused_state =
      time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
  queue_time_sum_ -= time_in_non_paused_state;

  // 删除该报文的queue time
  RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
  enqueue_times_.erase(queued_packet.EnqueueTimeIterator());

  // 报文发送后,更新stream发送的报文字节数,发送较少的stream应该有更高的调度优先级
  // 为了避免发送码率较低的stream一致处于较高优先级发送过多,限制了最低发送字节数
  DataSize packet_size = PacketSize(queued_packet);
  stream->size =
      std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
  max_size_ = std::max(max_size_, stream->size);

  size_ -= packet_size;
  size_packets_ -= 1;
  RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());

  std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
  stream->packet_queue.pop();

  // 如果剩余报文需要发送,更新调度优先级
  RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
  if (stream->packet_queue.empty()) {
    stream->priority_it = stream_priorities_.end();
  } else {
    int priority = stream->packet_queue.top().Priority();
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(priority, stream->size), stream->ssrc);
  }

  return rtp_packet;
}

原文出处:WebRTC Pacing模块

今天聊一下WebRTC中的Pacing模块,它的代码位于webrtc源码的src/modules/pacing目录,请结合源码阅读本文,加深对pacing的理解。

pacing、pacer、pace,顾名思义,有节奏、按照节奏发送报文的含义,一般我们也称作平滑发送。 pacing是拥塞控制中重要的一个环节,它受拥塞控制输出的估计带宽、拥塞窗口大小控制,按照节奏(一般是5ms)保证报文匀速地发送到网络中,可以避免短时间大量报文造成网络情况恶化。

本文将带着大家通过读代码方式深入pacing模块,了解pacing原理。

1. 为什么需要pacing

pacing和拥塞控制紧密相关,我们经常需要面对发送超过网络实际带宽出现拥塞的情况。

我们知道视频编码经过压缩后,随着画面复杂性变化,帧大小差异较大从几十k到几M都有(4k桌面共享可能会到几M)。如果将2M的视频编码输出数据在1ms内全部发送到网络中,对于很多网络会造成较大冲击。首先是客户端的socket buffer的输入远远大于输出,socket buffer满了后就会出现丢包;网络中路由器可能无法处理超过的突发流量,会造成网络拥塞甚至是突发的百分之几十的大丢包。

突发流量对网络的冲击造成的拥塞会造成发送端的评估带宽降低,会压制编码码率造成视频质量下降。一般来说,gcc的带宽估计机制也需要依赖pacing,如probing,一些机制可能会将发送码率hold在目标探测带宽,这需要依赖pacing来做到。

如果当前网络带宽存在瓶颈,我们必须要保证当前的发送码率不超过瓶颈,避免发送过多数据造成瞬间的拥塞。pacing可以将瞬间的码率(如1ms)平滑到长窗口中,牺牲发送时间换不拥塞。

在带宽不够的时候,我们可以在pacing中对报文根据优先级排队,优先保证重要的媒体流。

2. 阅读代码前

pacing模块涉及到几个比较独立的类,控制发送码率的IntervalBudget 、一个优先队列RoundRobinPacketQueue,在深入pacing模块时,在了解pacing大致流程后可以深入了解下这两个类,可以了解更精细的算法:

言剑:WebRTC IntervalBudget

言剑:WebRTC RoundRobinPacketQueue

新的代码里面, 有两种处理模式,周期调度方式(kPeriodic)、动态调度(kDynamic)。

// Periodic mode uses the IntervalBudget class for tracking bitrate
// budgets, and expected ProcessPackets() to be called a fixed rate,
// e.g. every 5ms as implemented by PacedSender.
// Dynamic mode allows for arbitrary time delta between calls to
// ProcessPackets.
enum class ProcessMode { kPeriodic, kDynamic };

比较传统的是继承Module,周期调用方式来调度;新的代码支持动态调度。最新的的代码Pacer有两个实现:

以下介绍先不介绍TaskQueuePacedSender这种新的Pacer,只关注传统的PacedSender周期调度方式。所以,后面涉及到kDynamic动态方式的代码都跳过了。

3. pacing模块对外接口

pacing模块接收输入的报文(继承RtpPacketSender)以及受目标码率控制(RtpPacketPacer),经过调度后输出要发送的报文(入参PacketRouter)。因此,PacedSender的实现如下:

class PacedSender : public Module,
                    public RtpPacketPacer,
                    public RtpPacketSender {
    PacedSender(Clock* clock,
        PacketRouter* packet_router,
        RtcEventLog* event_log,
        const WebRtcKeyValueConfig* field_trials = nullptr,
        ProcessThread* process_thread = nullptr);
    ... 
}

PacedSender实现了三个接口:

PacedSender其参数PacketRouter,可以将pacing后的rtp报文路由到不同模块去做实际发送。PacketRouter实际上是继承了PacingController::PacketSender

4. pacing工作流程

我们从一个包插入pacing后的处理看一下pacing的工作流程。该部分主要包含PacedSenderPacingController等类的处理,随着代码变迁,PacedSender基本包含什么逻辑,都直接通过PacingController中处理了, 因此一下重点关注这个类。

4.1 输入报文:PacedSender::EnqueuePackets

EnquuePackts也是直接把报文交给PacingController::EnqueuePacket处理。

void PacedSender::EnqueuePackets(
    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
  {
    rtc::CritScope cs(&critsect_);
    for (auto& packet : packets) {
      pacing_controller_.EnqueuePacket(std::move(packet));
    }
  }
  MaybeWakupProcessThread();
}

4.2 决策优先级并插入queue:PacingController::EnqueuePacket

PacingController::EnqueuePacket中会根据报文类型,给报文赋予发送优先级,并进行后续处理。

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  // Get priority first and store in temporary, to avoid chance of object being
  // moved before GetPriorityForType() being called.
  const int priority = GetPriorityForType(*packet->packet_type());
  EnqueuePacketInternal(std::move(packet), priority);
}

优先级的处理也比较简单:

int GetPriorityForType(RtpPacketMediaType type) {
  // Lower number takes priority over higher.
  switch (type) {
    case RtpPacketMediaType::kAudio:
      // Audio is always prioritized over other packet types.
      return kFirstPriority + 1;
    case RtpPacketMediaType::kRetransmission:
      // Send retransmissions before new media.
      return kFirstPriority + 2;
    case RtpPacketMediaType::kVideo:
    case RtpPacketMediaType::kForwardErrorCorrection:
      // Video has "normal" priority, in the old speak.
      // Send redundancy concurrently to video. If it is delayed it might have a
      // lower chance of being useful.
      return kFirstPriority + 3;
    case RtpPacketMediaType::kPadding:
      // Packets that are in themselves likely useless, only sent to keep the
      // BWE high.
      return kFirstPriority + 4;
  }
}

最终通过PacingController::EnqueuePacketInternal插入queue。在插入queue之前先需要做probing相关处理(probing、和padding功能暂且按下不表),以及会根据Pacing的处理模式(动态和周期两种模式)做budget的更新。

void PacingController::EnqueuePacketInternal(
    std::unique_ptr<RtpPacketToSend> packet,
    int priority) {
  prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
  // TODO(sprang): Make sure tests respect this, replace with DCHECK.
  Timestamp now = CurrentTime();
  if (packet->capture_time_ms() < 0) {
    packet->set_capture_time_ms(now.ms());
  }

  ...
  // 直接插入优先队列中
  packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}

4.3 Pacing模式以及budget处理

插入到queue中,报文插入就告一段落,pacing是另外一个线程中处理。 周期处理方式中用到两个budget来分配media和padding的带宽:

IntervalBudget media_budget_;
IntervalBudget padding_budget_;

关于IntervalBudget可以参考另一篇:

言剑:WebRTC IntervalBudget

从上面的IntervalBudget介绍可以知道,随着时间流逝,budget增加;报文发送后budget减少。pacing根据是否有budget剩余决定是否可以发送,如果有的话从RoundRobinQueue中取最高优先级的报文发送,通过这种方式可以控制发送码率为设置的码率。

budget如何控制:

4.4 pacing核心逻辑:PacingController::ProcessPackets

报文插入到queue中,pacer需要周期调度,在budget足够的时候,从queue中取出报文发送,处理逻辑主要看PacingController::ProcessPackets,函数比较长,这里带着大家啃下来。至于怎么从queue取出报文,先发送哪个可以看RoundRobinPacketQueue介绍:

言剑:WebRTC RoundRobinPacketQueue

删除了动态处理的代码后如下,主要逻辑可以总结为:根据拥塞状态以及budget是否足够,从queue中获取需要发送的报文,可以发送则发送媒体报文,否则判断是否发送padding;如果队列中数据过多,可能需要增大码率做排空处理;

void PacingController::ProcessPackets() {
  Timestamp now = CurrentTime();
  Timestamp target_send_time = now;
  ...

  Timestamp previous_process_time = last_process_time_;
  // 距离上次处理的时间,限制不超过2s
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);

  // 1) 保活,没啥好讲的
  if (ShouldSendKeepalive(now)) {
    // We can not send padding unless a normal packet has first been sent. If
    // we do, timestamps get messed up.
    if (packet_counter_ == 0) {
      last_send_time_ = now;
    } else {
      DataSize keepalive_data_sent = DataSize::Zero();
      std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
          packet_sender_->GeneratePadding(DataSize::Bytes(1));
      for (auto& packet : keepalive_packets) {
        keepalive_data_sent +=
            DataSize::Bytes(packet->payload_size() + packet->padding_size());
        packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
      }
      OnPaddingSent(keepalive_data_sent);
    }
  }

  if (paused_) {
    return;
  }

  // 2) 如果开启了drain_large_queues_,queue中的数据难以以当前速率在剩余时间内发送出去
  // 则适当提高当前发送码率(通过修改budget)
  if (elapsed_time > TimeDelta::Zero()) {
    DataRate target_rate = pacing_bitrate_;
    DataSize queue_size_data = packet_queue_.Size();
    if (queue_size_data > DataSize::Zero()) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      packet_queue_.UpdateQueueTime(now);
      if (drain_large_queues_) {
        TimeDelta avg_time_left =
            std::max(TimeDelta::Millis(1),
                     queue_time_limit - packet_queue_.AverageQueueTime());
        DataRate min_rate_needed = queue_size_data / avg_time_left;
        if (min_rate_needed > target_rate) {
          target_rate = min_rate_needed;
          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                              << target_rate.kbps();
        }
      }
    }

    // 提高当前budget,用于尽快排空
    if (mode_ == ProcessMode::kPeriodic) {
      // In periodic processing mode, the IntevalBudget allows positive budget
      // up to (process interval duration) * (target rate), so we only need to
      // update it once before the packet sending loop.
      media_budget_.set_target_rate_kbps(target_rate.kbps());
      UpdateBudgetWithElapsedTime(elapsed_time);
    } else {
      media_rate_ = target_rate;
    }
  }

  // 3) 获取probing大小
  bool first_packet_in_probe = false;
  bool is_probing = prober_.is_probing();
  PacedPacketInfo pacing_info;
  absl::optional<DataSize> recommended_probe_size;
  if (is_probing) {
    pacing_info = prober_.CurrentCluster();
    first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
    recommended_probe_size = DataSize::Bytes(prober_.RecommendedMinProbeSize());
  }

  DataSize data_sent = DataSize::Zero();

  // The paused state is checked in the loop since it leaves the critical
  // section allowing the paused state to be changed from other code.
  while (!paused_) {

    // 4) 第一次probing,padding包比较小,更可靠的探测
    if (small_first_probe_packet_ && first_packet_in_probe) {
      // If first packet in probe, insert a small padding packet so we have a
      // more reliable start window for the rate estimation.
      auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
      // If no RTP modules sending media are registered, we may not get a
      // padding packet back.
      if (!padding.empty()) {
        // Insert with high priority so larger media packets don't preempt it.
        EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
        // We should never get more than one padding packets with a requested
        // size of 1 byte.
        RTC_DCHECK_EQ(padding.size(), 1u);
      }
      first_packet_in_probe = false;
    }

    ...

    // 5) 获取需要发送的报文,需要检查是否拥塞、budget是否足够
    // Fetch the next packet, so long as queue is not empty or budget is not
    // exhausted.
    std::unique_ptr<RtpPacketToSend> rtp_packet =
        GetPendingPacket(pacing_info, target_send_time, now);

    // 6) 当前无法发送媒体包,检查是否发送padding
    if (rtp_packet == nullptr) {
      // No packet available to send, check if we should send padding.
      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
      if (padding_to_add > DataSize::Zero()) {
        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
            packet_sender_->GeneratePadding(padding_to_add);
        if (padding_packets.empty()) {
          // No padding packets were generated, quite send loop.
          break;
        }
        for (auto& packet : padding_packets) {
          EnqueuePacket(std::move(packet));
        }
        // Continue loop to send the padding that was just added.
        continue;
      }

      // Can't fetch new packet and no padding to send, exit send loop.
      break;
    }

    // 7) 封装发送报文,通过回调发送报文
    const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
    DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
                                           rtp_packet->padding_size());

    if (include_overhead_) {
      packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
                     transport_overhead_per_packet_;
    }
    packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);

    data_sent += packet_size;

    //  8) 发送完成后,更新一些统计以及budget
    // Send done, update send/process time to the target send time.
    OnPacketSent(packet_type, packet_size, target_send_time);
    if (recommended_probe_size && data_sent > *recommended_probe_size)
      break;
  }

  last_process_time_ = std::max(last_process_time_, previous_process_time);

  // 9) 更新probing状态
  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime(), data_sent.bytes());
    }
  }
}

细细啃下来,其实逻辑并不复杂。后面关注一下其他细节,比如拥塞窗口、probing等。

5. 拥塞窗口

在决定是否发送报文的时候(GetPendingPacket),我们发现有判断是否拥塞,因此有必要讲解下拥塞窗口:

发送报文过多会导致拥塞,这句话其实已经给了我们答案。一下代码里面的outstanding_data_就是正在网络中正在发送的报文, congestion_window_size_是一个阈值,如果正在网络中发送的报文超过这个阈值就认为是会导致拥塞:

bool PacingController::Congested() const {
  if (congestion_window_size_.IsFinite()) {
    return outstanding_data_ >= congestion_window_size_;
  }
  return false;
}

outstanding_data_在发送报文的时候增加,当对端收到通过FeedBack Ack后, outstanding_data_减小。而 congestion_window_size_是拥塞控制模块决策出来的一个阈值,此处先不深入挖掘。

6. Probing

Pacing模块里面有一个绕不开的功能是probing,对应bitrate_prober.h文件里面定义的BitrateProber。probing和带宽探测部分是紧密相关的,用于在发送带宽比较低的时候,可以探测到一个比较高的带宽。probing功能通过PacedSender::CreateProbeCluster暴露出去, pacing模块会根据当前probing状态,发送合适的padding包,达到目标探测码率。

关于probing的功能,后续会有一个专题专门介绍,此处不多介绍。

7. Packet发送

最后就剩下报文发送,我们知道pacing模块只保存了报文信息,真正的报文发送还需要通过上层模块完成。曾几何时,引入了PacketRouter这个,pacing模块调用PacketRouter::SendPacket完成报文发送。

大致看下这个函数,主要完成TransportSequenceNumber自增,通过报文ssrc找到合适的RtpRtcp模块去做进一步发送处理。

void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
                              const PacedPacketInfo& cluster_info) {
  MutexLock lock(&modules_mutex_);
  // Transport Sequence Number是在这个地方赋值的!!
  if (packet->HasExtension<TransportSequenceNumber>()) {
    packet->SetExtension<TransportSequenceNumber>((++transport_seq_) & 0xFFFF);
  }

  // 找到对应的模块去发送
  uint32_t ssrc = packet->Ssrc();
  auto kv = send_modules_map_.find(ssrc);
  if (kv == send_modules_map_.end()) {
    RTC_LOG(LS_WARNING)
        << "Failed to send packet, matching RTP module not found "
           "or transport error. SSRC = "
        << packet->Ssrc() << ", sequence number " << packet->SequenceNumber();
    return;
  }

  RtpRtcpInterface* rtp_module = kv->second;
  if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
    RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
    return;
  }

  if (rtp_module->SupportsRtxPayloadPadding()) {
    // This is now the last module to send media, and has the desired
    // properties needed for payload based padding. Cache it for later use.
    last_send_module_ = rtp_module;
  }
}

PacketRouter提供了RtpRtcp模块的注册:

void AddSendRtpModule(RtpRtcpInterface* rtp_module, bool remb_candidate);
void RemoveSendRtpModule(RtpRtcpInterface* rtp_module);
void AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
                         bool remb_candidate);
void RemoveReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender);

PacketRouterRtpTransportControllerSend的一个成员,通过参数方式传入PacedSender。这个类也比较简单,想深入了解报文发送,可以结合文件分析下。

8. 写在最后

pacing模块算是啃完了,内容比较多,因此最后总结下,pacing模块的重点。