原文出处:WebRTC GCC代码深度解读(9)AimdRateControl

1. AIMD介绍

AIMD,全称是Additive Increase Multiplicative Decrease,是TCP拥塞控制中的一个方法。在GCC中同样使用到了AIMD,用来控制估计带宽。简单点说,就是在没有拥塞的时候线性增加拥塞窗口,在拥塞的时候指数级减低拥塞窗口。详细可以见wikipedia介绍:

Additive increase/multiplicative decrease

AimdRateControl的输入是TrendlineEstimator的输出结果,包含了三个状态:overuse、underuse、normal,输出估计的带宽。

2. 源码解读

AimdRateControl根据trendline estimator的输出结果(overuse、underuse、normal)决策当前带宽的增加和降低,总的来说带宽改变遵循AIMD原则:

一些细节可以见下面的代码讲解。

2.1 状态机

AimdRateControl存在3个控制状态:Hold、Increase、Decrease,输入包括overusing、underusing、normal。

AIMD状态机

代码实现如下:

void AimdRateControl::ChangeState(const RateControlInput& input,
                                  Timestamp at_time) {
  switch (input.bw_state) {
    case BandwidthUsage::kBwNormal:
    // hold状态下,延迟趋势检测为normal,则需要继续增加估计带宽
      if (rate_control_state_ == RateControlState::kRcHold) {
        time_last_bitrate_change_ = at_time;
        rate_control_state_ = RateControlState::kRcIncrease;
      }
      break;
    case BandwidthUsage::kBwOverusing:
      // 正在降低估计带宽时,延迟趋势检测为overusing,需要继续降低估计带宽
      // decrease一次后会进入hold状态
      if (rate_control_state_ != RateControlState::kRcDecrease) {
        rate_control_state_ = RateControlState::kRcDecrease;
      }
      break;
    case BandwidthUsage::kBwUnderusing:
      // underusing状态需要hold
      rate_control_state_ = RateControlState::kRcHold;
      break;
    default:
      RTC_NOTREACHED();
  }
}

2.2 增加带宽(线性/指数级)

增加分为线性增加(Additive increase)和指数级增加(multiplicative increase),在还没有达到带宽瓶颈的时候采用指数级增加,用于快速达到目标带宽;一旦达到了瓶颈之后,后续探测需要比较谨慎,因此采用线性方式。下我们我们结合代码来说明两种方式。

线性增加的思想是,每RTT+100ms增加一个包大小。

DataRate AimdRateControl::AdditiveRateIncrease(Timestamp at_time,
                                               Timestamp last_time) const {
  double time_period_seconds = (at_time - last_time).seconds<double>();
  double data_rate_increase_bps =
      GetNearMaxIncreaseRateBpsPerSecond() * time_period_seconds;
  return DataRate::BitsPerSec(data_rate_increase_bps);
}

// 每秒增加码率
double AimdRateControl::GetNearMaxIncreaseRateBpsPerSecond() const {
  // 按照每秒30帧来计算每一帧的大小
  const TimeDelta kFrameInterval = TimeDelta::Seconds(1) / 30;
  DataSize frame_size = current_bitrate_ * kFrameInterval;

  // 按照每个包1200字节来结算每个包大小
  const DataSize kPacketSize = DataSize::Bytes(1200);
  double packets_per_frame = std::ceil(frame_size / kPacketSize);
  DataSize avg_packet_size = frame_size / packets_per_frame;

  // Approximate the over-use estimator delay to 100 ms.
  // 思想是每RTT+100ms增加一个包
  TimeDelta response_time = rtt_ + TimeDelta::Millis(100);
  // 有个实验是2倍RTT才增加一个包
  if (in_experiment_) response_time = response_time * 2;
  double increase_rate_bps_per_second =
      (avg_packet_size / response_time).bps<double>();

  // 每秒至少增加4000kbps
  double kMinIncreaseRateBpsPerSecond = 4000;
  return std::max(kMinIncreaseRateBpsPerSecond, increase_rate_bps_per_second);
}

指数级增加思想:每秒按照1.08倍增加,1.08^(elapse) - 1,距离上次降低时间越长,增加越多。

DataRate AimdRateControl::MultiplicativeRateIncrease(
    Timestamp at_time,
    Timestamp last_time,
    DataRate current_bitrate) const {
  // 第一次alpha为1.08,降低为0.08
  double alpha = 1.08;
  if (last_time.IsFinite()) {
    auto time_since_last_update = at_time - last_time;
    // 非第一次降低,指数函数:距离上次时间越长,alpha越大,降低越多
    alpha = pow(alpha, std::min(time_since_last_update.seconds<double>(), 1.0));
  }

  // 降低的比例为alpha - 1,至少降低1kbps
  DataRate multiplicative_increase =
      std::max(current_bitrate * (alpha - 1.0), DataRate::BitsPerSec(1000));
  return multiplicative_increase;
}

2.3 降低带宽

降低带宽按照每秒beta倍(默认0.85倍)降低。我们在后面的代码里面详细介绍

2.4 link capacity

link capacity,顾名思义,它是用来检测一段时间内链路瓶颈的工具类。考虑到估计存在抖动,在短时间内估计的最小能力可以认为是一个比较保险的链路能力。LinkCapacityEstimator便是用在AIMD中,得到当前链路最小能力的类。

LinkCapacityEstimator只在每次带宽降低的时候更新,而且按照0.95倍的速度平滑。LinkCapacityEstimator返回了一个上限和下限,分别是加减3倍标准差,为什么选择3?因为3倍标准差,基本上能覆盖99%置信区间。

link capacity在overuse的时候触发更新,因此在初始阶段需要指数级增加带宽,直至出现overuse,此时链路出现瓶颈,当出现瓶颈后探测需要谨慎。当需求带宽不高的时候,可能会探测不到带宽瓶颈,因此link capacity可能没有任何更新。

当ack码率超过link capacity的上下限后,都需要重置,重新检测链路瓶颈。

2.5 AIMD更新

AIMD的增加分为两个阶段,在没有探测到带宽瓶颈的时候指数级增加,在探测到瓶颈后比较谨慎,增加变为线性增加。降低带宽都是指数级降低。

DataRate AimdRateControl::Update(const RateControlInput* input,
                                 Timestamp at_time) {
  // 如果5s后还没有初始化码率,则直接使用接收码率作为估计码率
  if (!bitrate_is_initialized_) {
    const TimeDelta kInitializationTime = TimeDelta::Seconds(5);
    if (time_first_throughput_estimate_.IsInfinite()) {
      if (input->estimated_throughput)
        time_first_throughput_estimate_ = at_time;
    } else if (at_time - time_first_throughput_estimate_ >
                   kInitializationTime &&
               input->estimated_throughput) {
      current_bitrate_ = *input->estimated_throughput;
      bitrate_is_initialized_ = true;
    }
  }

  ChangeBitrate(*input, at_time);
  return current_bitrate_;
}

核心的逻辑在ChangeBitrate这个函数上:

void AimdRateControl::ChangeBitrate(const RateControlInput& input,
                                    Timestamp at_time) {
  // estimated_throughput就是ACK码率
  absl::optional<DataRate> new_bitrate;
  DataRate estimated_throughput =
      input.estimated_throughput.value_or(latest_estimated_throughput_);
  if (input.estimated_throughput)
    latest_estimated_throughput_ = *input.estimated_throughput;

  // An over-use should always trigger us to reduce the bitrate, even though
  // we have not yet established our first estimate. By acting on the over-use,
  // we will end up with a valid estimate.
  // 估计码率没有初始化,此时增加和减少没有什么意义,直接返回。
  if (!bitrate_is_initialized_ &&
      input.bw_state != BandwidthUsage::kBwOverusing)
    return;

  // 状态迁移,见上面介绍
  ChangeState(input, at_time);

  // We limit the new bitrate based on the troughput to avoid unlimited bitrate
  // increases. We allow a bit more lag at very low rates to not too easily get
  // stuck if the encoder produces uneven outputs.
  // 限制最大估计码率为ACK码率的1.5倍,避免没有限制的增加
  // 额外增加10kbps是为了避免在低带宽时增加太慢
  const DataRate troughput_based_limit =
      1.5 * estimated_throughput + DataRate::KilobitsPerSec(10);

  // 根据状态调整带宽
  switch (rate_control_state_) {
    // hold状态,即带宽不变,直接退出
    case RateControlState::kRcHold:
      break;

    case RateControlState::kRcIncrease:
      // ACK码率超过链路上限,超出99%置信区间,当前的链路瓶颈可能变化了,需要重新检测
      if (estimated_throughput > link_capacity_.UpperBound())
        link_capacity_.Reset();

      // Do not increase the delay based estimate in alr since the estimator
      // will not be able to get transport feedback necessary to detect if
      // the new estimate is correct.
      // If we have previously increased above the limit (for instance due to
      // probing), we don't allow further changes.
      // 针对ALR的特殊处理,在ALR状态,因为发送码率很低,因此单向延迟趋势估计不是太准,此时跳过下面逻辑
      if (current_bitrate_ < troughput_based_limit &&
          !(send_side_ && in_alr_ && no_bitrate_increase_in_alr_)) {

        // 之前已经估计出链路的瓶颈了(link capacity有结果),此时的增加需要谨慎,因此需要线性增加
        DataRate increased_bitrate = DataRate::MinusInfinity();
        if (link_capacity_.has_estimate()) {
          // The link_capacity estimate is reset if the measured throughput
          // is too far from the estimate. We can therefore assume that our
          // target rate is reasonably close to link capacity and use additive
          // increase.
          DataRate additive_increase =
              AdditiveRateIncrease(at_time, time_last_bitrate_change_);
          increased_bitrate = current_bitrate_ + additive_increase;
        }
        // 压根没有达到链路瓶颈,没有出现过overuse,可以加快速度估计,这是可以1.08倍指数级增加
        else {
          // If we don't have an estimate of the link capacity, use faster ramp
          // up to discover the capacity.
          DataRate multiplicative_increase = MultiplicativeRateIncrease(
              at_time, time_last_bitrate_change_, current_bitrate_);
          increased_bitrate = current_bitrate_ + multiplicative_increase;
        }
        new_bitrate = std::min(increased_bitrate, troughput_based_limit);
      }

      time_last_bitrate_change_ = at_time;
      break;

    case RateControlState::kRcDecrease: {
      DataRate decreased_bitrate = DataRate::PlusInfinity();

      // Set bit rate to something slightly lower than the measured throughput
      // to get rid of any self-induced delay.
      // beta默认0.85,降低按照0.85倍降低,按照吞吐率的0.85倍
      // 如果比当前的估计带宽还要大,那么按照链路能力的0.85倍,链路能力估计较为稳定,也较低
      decreased_bitrate = estimated_throughput * beta_;
      if (decreased_bitrate > current_bitrate_ && !link_capacity_fix_) {
        // TODO(terelius): The link_capacity estimate may be based on old
        // throughput measurements. Relying on them may lead to unnecessary
        // BWE drops.
        if (link_capacity_.has_estimate()) {
          decreased_bitrate = beta_ * link_capacity_.estimate();
        }
      }
      if (estimate_bounded_backoff_ && network_estimate_) {
        decreased_bitrate = std::max(
            decreased_bitrate, network_estimate_->link_capacity_lower * beta_);
      }

      // Avoid increasing the rate when over-using.
      if (decreased_bitrate < current_bitrate_) {
        new_bitrate = decreased_bitrate;
      }

      if (bitrate_is_initialized_ && estimated_throughput < current_bitrate_) {
        if (!new_bitrate.has_value()) {
          last_decrease_ = DataRate::Zero();
        } else {
          last_decrease_ = current_bitrate_ - *new_bitrate;
        }
      }

      // 当前的估计码率低于link capacity下限,即超过99%置信区间,我们对link capacity的准确度标识怀疑
      if (estimated_throughput < link_capacity_.LowerBound()) {
        // The current throughput is far from the estimated link capacity. Clear
        // the estimate to allow an immediate update in OnOveruseDetected.
        link_capacity_.Reset();
      }

      bitrate_is_initialized_ = true;

      // 每次overuse降低带宽的时候更新link capacity
      link_capacity_.OnOveruseDetected(estimated_throughput);
      // Stay on hold until the pipes are cleared.
      // 降低后先进入hold状态
      rate_control_state_ = RateControlState::kRcHold;
      time_last_bitrate_change_ = at_time;
      time_last_bitrate_decrease_ = at_time;
      break;
    }
    default:
      RTC_NOTREACHED();
  }

  // 码率范围做一个限制,见下面说明
  current_bitrate_ = ClampBitrate(new_bitrate.value_or(current_bitrate_));
}

// 限制估计码率,不超过link capacity的上限,不低于配置的最低值
// min_configured_bitrate_是带宽估计的下限
DataRate AimdRateControl::ClampBitrate(DataRate new_bitrate) const {
  if (estimate_bounded_increase_ && network_estimate_) {
    DataRate upper_bound = network_estimate_->link_capacity_upper;
    new_bitrate = std::min(new_bitrate, upper_bound);
  }
  new_bitrate = std::max(new_bitrate, min_configured_bitrate_);
  return new_bitrate;
}

3. 总结

AIMD的思想相对也比较简单,实际使用过程中主要是一些细节上的注意。


原文出处:WebRTC GCC代码深度解读(10)基于单向延迟的带宽估计

1. 简介

前面我们有介绍过基于单向延迟的带宽估计的相关工具类,如:

这里我们将基于更全局的类DelayBasedBwe来讲解下怎么把这些类串起来,完成基于延迟的带宽估计。DelayBasedBwe这个类没有太多算法思想,纯粹是上面各个类的调用,本章可以轻松阅读。遇到不清楚的可以再查看前几个类的分析文章。

言剑:WebRTC GCC代码深度解读(7)

言剑:WebRTC GCC代码深度解读(8)

言剑:WebRTC GCC代码深度解读(9) )

整个DelayBasedBwe类流程如下:

delay based bwe

2. 源码讲解

DelayBasedBwe这个类是一个组合了多个功能,这里主要介绍下前面没有提到的一些细节。

2.1 定义

输出结果定义:

struct Result {
  bool updated;   // 带宽是否有更新
  bool probe;     // 是否需要探测带宽
  DataRate target_bitrate = DataRate::Zero(); // 目标带宽
  bool recovered_from_overuse;    // 是否从overuse中恢复
  bool backoff_in_alr;    // 是否因为ALR处于退避状态
};

这里有一个recovered_from_overuse标志,判断条件是:是否从overuse变为normal:

if (prev_detector_state == BandwidthUsage::kBwUnderusing &&
    active_delay_detector_->State() == BandwidthUsage::kBwNormal) {
    recovered_from_overuse = true;
}

从下面的类定义,我们可以看到,DelayBasedBwe串联了InterArrivalDelta、TrendlineEstimator、AimdRateControl功能。但是有两套检测器,针对音频和视频分别做了一套,将音频和视频区分开来检测的会更加准确。

class DelayBasedBwe {
 public:

  // 输入相关信息,触发带宽估计
  Result IncomingPacketFeedbackVector(
      const TransportPacketsFeedback& msg,  // TCC feedback消息
      absl::optional<DataRate> acked_bitrate, // 从TCC feedback中计算的ACK码率
      absl::optional<DataRate> probe_bitrate, // 需要探测的目标码率
      absl::optional<NetworkStateEstimate> network_estimate,  // 网络状态估计
      bool in_alr);  // 当前是否处于ALR状态
  ...

  // 直接触发overuse,并给定瓶颈带宽
  DataRate TriggerOveruse(Timestamp at_time,
                          absl::optional<DataRate> link_capacity);
 private:
  ...
  // Alternatively, run two separate overuse detectors for audio and video,
  // and fall back to the audio one if we haven't seen a video packet in a
  // while.
  // 这个类里面分别维护了音频和视频的overuse估计,在没有视频的时候可以使用音频的一套参数
  BweSeparateAudioPacketsSettings separate_audio_;
  int64_t audio_packets_since_last_video_;
  Timestamp last_video_packet_recv_time_;

  // 网络状态估计
  NetworkStatePredictor* network_state_predictor_;

  // 视频的InterArrival、DelayIncreaseDetectorInterface(trendline estmator)
  // InterArrivalDelta是从InterArrival复制过来的
  std::unique_ptr<InterArrival> video_inter_arrival_;
  std::unique_ptr<InterArrivalDelta> video_inter_arrival_delta_;
  std::unique_ptr<DelayIncreaseDetectorInterface> video_delay_detector_;

  // 音频的InterArrival、DelayIncreaseDetectorInterface(trendline estmator)
  std::unique_ptr<InterArrival> audio_inter_arrival_;
  std::unique_ptr<InterArrivalDelta> audio_inter_arrival_delta_;
  std::unique_ptr<DelayIncreaseDetectorInterface> audio_delay_detector_;

  // 当前在使用音频还是视频的DelayIncreaseDetectorInterface
  DelayIncreaseDetectorInterface* active_delay_detector_;

  Timestamp last_seen_packet_;
  bool uma_recorded_;

  AimdRateControl rate_control_;    // AIMD控制
  DataRate prev_bitrate_;           // 之前的码率
  bool has_once_detected_overuse_;  // 是否检测到过overuse
  BandwidthUsage prev_state_;       // 之前的带宽使用状态
  const bool use_new_inter_arrival_delta_;  // 是否使用新的inter arrival
  bool alr_limited_backoff_enabled_;        // ALR状态限制退避是否开启
};

2.2 IncomingPacketFeedback

这个对外接口主要输入TCC feedback,然后输入到其他模块中。

void DelayBasedBwe::IncomingPacketFeedback(const PacketResult& packet_feedback,
                                           Timestamp at_time) {
  // Reset if the stream has timed out.
  if (last_seen_packet_.IsInfinite() ||
      at_time - last_seen_packet_ > kStreamTimeOut) {
    // 首次收到tcc feedback或者距离上次收包超过2s,需要重新创建InterArrival、TrendlineEstimator
    ...

    // 默认使用视频的TrendlineEstimator
    active_delay_detector_ = video_delay_detector_.get();
  }
  last_seen_packet_ = at_time;

  // 一般音频报相对于视频包要小很多,而网络设备对于大小不同的包策略不一样
  // 因此这里可以区分音频和视频包,通过separate_audio_.enabled控制
  // 如果是音频报则输入音频的InterArrival、TrendlineEstimator,如果是视频包则使用视频的
  // 如果没有开启,则直接使用视频的InterArrival、TrendlineEstimator

  // 输入数据到InterArrival、TrendlineEstimator
  ...
}

2.2 MaybeUpdateEstimate

其他的地方都不需要做太多介绍,这个函数需要重点关注下,他是带宽估计的核心。检测到overuse的时候需要注意:

DelayBasedBwe::Result DelayBasedBwe::MaybeUpdateEstimate(
    absl::optional<DataRate> acked_bitrate,
    absl::optional<DataRate> probe_bitrate,
    absl::optional<NetworkStateEstimate> state_estimate,
    bool recovered_from_overuse,
    bool in_alr,
    Timestamp at_time) {
  Result result;

  // Currently overusing the bandwidth.
  if (active_delay_detector_->State() == BandwidthUsage::kBwOverusing) {
    // has_once_detected_overuse_:检测过一次overuse
    // TimeToReduceFurther: 还没有到降低带宽的时间
    // 如果在ALR状态下检测到overuse,此时的ACK码率因为发送数据量较少不能代表实际情况
    // 使用上次的估计带宽作为ACK码率
    if (has_once_detected_overuse_ && in_alr && alr_limited_backoff_enabled_) {
      if (rate_control_.TimeToReduceFurther(at_time, prev_bitrate_)) {
        result.updated =
            UpdateEstimate(at_time, prev_bitrate_, &result.target_bitrate);
        // ALR状态需要继续“hold”
        result.backoff_in_alr = true;
      }
    }
    // 如果存在ACK码率,那么按照实际ACK码率去改变带宽即可
    else if (acked_bitrate &&
               rate_control_.TimeToReduceFurther(at_time, *acked_bitrate)) {
      result.updated =
          UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
    }
    // 在还没有检测到ACK码率之前就检测到overuse,每200ms降低50%码率
    else if (!acked_bitrate && rate_control_.ValidEstimate() &&
               rate_control_.InitialTimeToReduceFurther(at_time)) {
      // Overusing before we have a measured acknowledged bitrate. Reduce send
      // rate by 50% every 200 ms.
      // TODO(tschumim): Improve this and/or the acknowledged bitrate estimator
      // so that we (almost) always have a bitrate estimate.
      rate_control_.SetEstimate(rate_control_.LatestEstimate() / 2, at_time);
      result.updated = true;
      result.probe = false;
      result.target_bitrate = rate_control_.LatestEstimate();
    }

    has_once_detected_overuse_ = true;
  } else {
    // 非overuse状态,如果有通过probe探测到带宽,那么则需要设置到AIMD中
    if (probe_bitrate) {
      result.probe = true;
      result.updated = true;
      result.target_bitrate = *probe_bitrate;
      rate_control_.SetEstimate(*probe_bitrate, at_time);
    } else {
      result.updated =
          UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
      result.recovered_from_overuse = recovered_from_overuse;
    }
  }

  // 更新prev_bitrate_、prev_state_
  BandwidthUsage detector_state = active_delay_detector_->State();
  if ((result.updated && prev_bitrate_ != result.target_bitrate) ||
      detector_state != prev_state_) {
    DataRate bitrate = result.updated ? result.target_bitrate : prev_bitrate_;

    prev_bitrate_ = bitrate;
    prev_state_ = detector_state;
  }
  return result;
}

2.3 TimeToReduceFurther in AIMD

TimeToReduceFurther,在AIMD里面两次降低带宽需要超过一个RTT(RTT现在在10~200ms),同时ACK码率相对于上次估计码率偏低,这个时候才能继续降低码率。

bool AimdRateControl::TimeToReduceFurther(Timestamp at_time,
                                          DataRate estimated_throughput) const {
  const TimeDelta bitrate_reduction_interval =
      rtt_.Clamped(TimeDelta::Millis(10), TimeDelta::Millis(200));
  // 两次带宽改变需要超过200ms
  if (at_time - time_last_bitrate_change_ >= bitrate_reduction_interval) {
    return true;
  }

  /// bitrate_is_initialized_
  if (ValidEstimate()) {
    // TODO(terelius/holmer): Investigate consequences of increasing
    // the threshold to 0.95 * LatestEstimate().
    // ACK码率低于上次估计的50%
    const DataRate threshold = 0.5 * LatestEstimate();
    return estimated_throughput < threshold;
  }
  return false;
}

2.4 InitialTimeToReduceFurther in AIMD

从上面代码,我们可以看到,如果在初始化阶段,还没有ACK码率,如果遇到了overuse,我们应该降低码率。需不需要等一段时间?AIMD中通过initial_backoff_interval_这个配置项来控制。InitialTimeToReduceFurther这个函数就是用来判断,初始化阶段是否立即降低码率。

bool AimdRateControl::InitialTimeToReduceFurther(Timestamp at_time) const {
  if (!initial_backoff_interval_) {
    // 没有设置interval,那么在初始化阶段一定会退避,遇到overuse就降低码率
    return ValidEstimate() &&
           TimeToReduceFurther(at_time,
                               LatestEstimate() / 2 - DataRate::BitsPerSec(1));
  }
  // TODO(terelius): We could use the RTT (clamped to suitable limits) instead
  // of a fixed bitrate_reduction_interval.
  // 退避需要超过一定间隔
  if (time_last_bitrate_decrease_.IsInfinite() ||
      at_time - time_last_bitrate_decrease_ >= *initial_backoff_interval_) {
    return true;
  }
  return false;
}

3. 总结

delay based bwe的主要内容在三个工具类里面,这里主要做了一些缝合,还是有一些值得注意的地方。


原文出处:WebRTC GCC代码深度解读(11)Probe:ProbeBitrateEstimator

1. 简介

通过AIMD方式增加估计带宽的方式比较慢,因此我们可以通过探测方式。在短时间内以目标码率发送数据,如果被ACK的码率达到发送码率的一定比例,那么便认为探测成功,此时链路可用带宽即是我们探测到的带宽。ProbeBitrateEstimator这个类便是用来估计发送探测包后ACK的码率。

2. 原理介绍

通过probe controller来控制是否发送probe、发送多少个probe等;通过pacing模块来控制发送probe报文,以及码率不够时使用padding补充;接收端通过TCC feedback来反馈接收到多少数据,通过ProbeBitrateEstimator来估计probe发送以及接收端收到多少数据。

因此ProbeBitrateEstimator的输入为TCC feedback,输出为估计的码率。

3. 源码解读

一次probe会发送多个包,因此,这估计码率的时候需要将这些包聚合起来:

struct AggregatedCluster {
    int num_probes = 0;  // 多少次探测
    Timestamp first_send = Timestamp::PlusInfinity();   // 第一次发送时间
    Timestamp last_send = Timestamp::MinusInfinity();   // 最后一次发送时间
    Timestamp first_receive = Timestamp::PlusInfinity();// 第一次接收时间
    Timestamp last_receive = Timestamp::MinusInfinity();// 最后一次接收时间
    DataSize size_last_send = DataSize::Zero();         // 最后一次发送字节数,计算时需要去掉
    DataSize size_first_receive = DataSize::Zero();     // 第一次接收字节数,计算时需要去掉
    DataSize size_total = DataSize::Zero();             // 总发送字节数
};

probe码率估计的过程在HandleProbeAndEstimateBitrate这个函数里面:

根据TCC ACK,计算发送码率和接收码率,取小值为最终的估计码率。处理上有些小细节,如:

接收到的probe需要达到一定比例才认为probe成功;计算时需要去掉一个包;透过率在一定比例内才认为是合理的。

absl::optional<DataRate> ProbeBitrateEstimator::HandleProbeAndEstimateBitrate(
    const PacketResult& packet_feedback) {
  int cluster_id = packet_feedback.sent_packet.pacing_info.probe_cluster_id;
  // 距离这次probe结果如果超过1s就需要删除
  EraseOldClusters(packet_feedback.receive_time);

  AggregatedCluster* cluster = &clusters_[cluster_id];

  if (packet_feedback.sent_packet.send_time < cluster->first_send) {
    cluster->first_send = packet_feedback.sent_packet.send_time;
  }
  if (packet_feedback.sent_packet.send_time > cluster->last_send) {
    cluster->last_send = packet_feedback.sent_packet.send_time;
    cluster->size_last_send = packet_feedback.sent_packet.size;
  }
  if (packet_feedback.receive_time < cluster->first_receive) {
    cluster->first_receive = packet_feedback.receive_time;
    cluster->size_first_receive = packet_feedback.sent_packet.size;
  }
  if (packet_feedback.receive_time > cluster->last_receive) {
    cluster->last_receive = packet_feedback.receive_time;
  }
  cluster->size_total += packet_feedback.sent_packet.size;
  cluster->num_probes += 1;

  // 发送端发送的probe报文,接收端至少需要接收到80%(kMinReceivedProbesRatio)
  int min_probes =
      packet_feedback.sent_packet.pacing_info.probe_cluster_min_probes *
      kMinReceivedProbesRatio;
  // 发送端发送的probe数据量,接收端至少需要接收到80%(kMinReceivedBytesRatio)
  DataSize min_size =
      DataSize::Bytes(
          packet_feedback.sent_packet.pacing_info.probe_cluster_min_bytes) *
      kMinReceivedBytesRatio;
  // 如果接收到probe报文以及数据量没有达到,计算的码率不准确
  if (cluster->num_probes < min_probes || cluster->size_total < min_size)
    return absl::nullopt;

  TimeDelta send_interval = cluster->last_send - cluster->first_send;
  TimeDelta receive_interval = cluster->last_receive - cluster->first_receive;

  // 发送和接收的间隔不能超过1s,否则超时
  if (send_interval <= TimeDelta::Zero() || send_interval > kMaxProbeInterval ||
      receive_interval <= TimeDelta::Zero() ||
      receive_interval > kMaxProbeInterval) {
    return absl::nullopt;
  }
  // Since the `send_interval` does not include the time it takes to actually
  // send the last packet the size of the last sent packet should not be
  // included when calculating the send bitrate.
  // 假设每T时间发送S大小的报文,总发送N个报文,时间为(N-1)*T,数据为N*S,需要去掉一个报文!
  DataSize send_size = cluster->size_total - cluster->size_last_send;
  DataRate send_rate = send_size / send_interval;

  // Since the `receive_interval` does not include the time it takes to
  // actually receive the first packet the size of the first received packet
  // should not be included when calculating the receive bitrate.
  // 同发送端,需要去掉一个报文,发送端需要去掉最后一个,接收端去掉第一个
  DataSize receive_size = cluster->size_total - cluster->size_first_receive;
  DataRate receive_rate = receive_size / receive_interval;

  double ratio = receive_rate / send_rate;

  // 接收数据超过发送数据的2倍,肯定不合理,一般是在1倍左右
  if (ratio > kMaxValidRatio) {
    return absl::nullopt;
  }

  DataRate res = std::min(send_rate, receive_rate);
  // If we're receiving at significantly lower bitrate than we were sending at,
  // it suggests that we've found the true capacity of the link. In this case,
  // set the target bitrate slightly lower to not immediately overuse.
  // kMinRatioForUnsaturatedLink = 0.9,kTargetUtilizationFraction = 0.95
  // 如果接收码率相对发送码率较低,这里会额外增加一点码率,为了恰好能达到链路瓶颈且不会有overuse
  // 这个地方的sendrate一般是我们按照估计带宽发送的,因此可以认为是链路的能力
  // 而probe通常是为了上探带宽,因此这里低于0.9倍发送码率认为是非预期行为,没有达到链路能力
  // 所以这里才会增加一点比例,保证链路能够充分利用(饱和,saturated)
  if (receive_rate < kMinRatioForUnsaturatedLink * send_rate) {
    res = kTargetUtilizationFraction * receive_rate;
  }

  estimated_data_rate_ = res;
  return estimated_data_rate_;
}

4. 总结

ProbeBitrateEstimator是一个用来根据TCC feedback估计probe码率结果的类。主要还是根据一次probe内的发送和接收数据码率来估计的。


原文出处:WebRTC GCC代码深度解读(12)Probe:ProbeController

1. 介绍

在一些场景,如初始阶段,或者应用的需求带宽变大过后,如果通过AIMD方式来增加估计带宽,速度较慢。GCC里面提供了另外一个工具,即探测。probe的思想是,在短时间内以目标码率发送数据,如果被ACK的码率达到发送码率的一定比例,那么便认为探测成功,此时链路可用带宽即是我们探测到的带宽。

ProbeController用于控制是否probe,probe码率是多少;pacing模块的BitrateProber完成probe的发送;ProbeBitrateEstimator完成probe结果的分析,分析probe是否成功以及结果码率。

probe能够快速地探测到我们需求的带宽,且对网络造成的影响也较小。但是其存在的缺陷也比较明显,因为发送时间不够长,在收到干扰的时候容易失败,在一些抖动场景很容易出现估计过高的问题。

触发探测的条件:

2. 源码解读

probe状态

probe存在3个状态,kInit是初始化状态,没有任何probe;kWaitingForProbingResult等待probe结果才能继续后续的probe;kProbingComplete完成probe。

接口

class ProbeController {
 public:
  // 设置最大码率、最小码率、初始码率
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig> SetBitrates(
      int64_t min_bitrate_bps,
      int64_t start_bitrate_bps,
      int64_t max_bitrate_bps,
      int64_t at_time_ms);

  // The total bitrate, as opposed to the max bitrate, is the sum of the
  // configured bitrates for all active streams.
  // 设置的所有流的总体需求码率,最大码率会根据需求码率调整
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig>
  OnMaxTotalAllocatedBitrate(int64_t max_total_allocated_bitrate,
                             int64_t at_time_ms);

  // 网络路由变化,是否可用
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig> OnNetworkAvailability(
      NetworkAvailability msg);

  // 设置估计的带宽
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig> SetEstimatedBitrate(
      int64_t bitrate_bps,
      int64_t at_time_ms);

  // ALR相关,ALR阶段因为发送数据较少,无法进行有效上探,因此可以开启在ALR阶段探测
  void EnablePeriodicAlrProbing(bool enable);
  void SetAlrStartTimeMs(absl::optional<int64_t> alr_start_time);
  void SetAlrEndedTimeMs(int64_t alr_end_time);

  // 直接请求probe
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig> RequestProbe(
      int64_t at_time_ms);

  // Sets a new maximum probing bitrate, without generating a new probe cluster.
  // 设置最大探测码率,不会触发probe
  void SetMaxBitrate(int64_t max_bitrate_bps);

  // Resets the ProbeController to a state equivalent to as if it was just
  // created EXCEPT for `enable_periodic_alr_probing_`.
  void Reset(int64_t at_time_ms);

  // 定时任务,主要是处理ALR状态下上探的功能
  ABSL_MUST_USE_RESULT std::vector<ProbeClusterConfig> Process(
      int64_t at_time_ms);

 private:
  bool network_available_;  // 网络是否可用
  State state_;     // probe状态
  int64_t min_bitrate_to_probe_further_bps_;    // 后续探测的最小码率
  int64_t time_last_probing_initiated_ms_;      // 上次初始化探测的时间
  int64_t estimated_bitrate_bps_;               // 估计的码率
  int64_t start_bitrate_bps_;                   // 初始化码率
  int64_t max_bitrate_bps_;                     // 最大码率
  int64_t last_bwe_drop_probing_time_ms_;       // 上次码率降低探测时间
  absl::optional<int64_t> alr_start_time_ms_;   // ALR持续时间
  absl::optional<int64_t> alr_end_time_ms_;     // ALR结束时间
  bool enable_periodic_alr_probing_;            // 开启周期ALR探测
  int64_t time_of_last_large_drop_ms_;          // 上次较大降低带宽时间
  int64_t bitrate_before_last_large_drop_bps_;  // 上次剧烈降低的时间
  int64_t max_total_allocated_bitrate_;         // 最大需求带宽

  ...

  int32_t next_probe_cluster_id_ = 1;           // 全局的probe id,自增

  ProbeControllerConfig config_;
};

InitiateProbing,初始化一次探测

我们先看下一个内部接口,ProbeController如何实现probe的呢?InitiateProbing的输入为当前时间、需要探测的多个码率(一般是渐进递增),后续是否需要继续probe。

std::vector<ProbeClusterConfig> ProbeController::InitiateProbing(
    int64_t now_ms,
    std::vector<int64_t> bitrates_to_probe,
    bool probe_further) {
  int64_t max_probe_bitrate_bps =
      max_bitrate_bps_ > 0 ? max_bitrate_bps_ : kDefaultMaxProbingBitrateBps;

  // 是否需要根据分配码率总和来限制探测码率
  // 因为没有使用那么多码率就没必要探测这么多码率
  if (limit_probes_with_allocateable_rate_ &&
      max_total_allocated_bitrate_ > 0) {
    // If a max allocated bitrate has been configured, allow probing up to 2x
    // that rate. This allows some overhead to account for bursty streams,
    // which otherwise would have to ramp up when the overshoot is already in
    // progress.
    // It also avoids minor quality reduction caused by probes often being
    // received at slightly less than the target probe bitrate.
    max_probe_bitrate_bps =
        std::min(max_probe_bitrate_bps, max_total_allocated_bitrate_ * 2);
  }

  std::vector<ProbeClusterConfig> pending_probes;
  for (int64_t bitrate : bitrates_to_probe) {
    RTC_DCHECK_GT(bitrate, 0);

    // 限制探测的上限,如果已经探测到上限,则不需要继续探测
    if (bitrate > max_probe_bitrate_bps) {
      bitrate = max_probe_bitrate_bps;
      probe_further = false;
    }

    // 生成一次探测,按照bitrate来探测,探测时长为15ms,至少发送5个探测包
    // 设置此次的probe id,每次探测由不同的ID区分。
    ProbeClusterConfig config;
    config.at_time = Timestamp::Millis(now_ms);
    config.target_data_rate =
        DataRate::BitsPerSec(rtc::dchecked_cast<int>(bitrate));
    config.target_duration = TimeDelta::Millis(kMinProbeDurationMs);
    config.target_probe_count = kMinProbePacketsSent;
    config.id = next_probe_cluster_id_;
    next_probe_cluster_id_++;
    MaybeLogProbeClusterCreated(event_log_, config);
    pending_probes.push_back(config);
  }
  time_last_probing_initiated_ms_ = now_ms;

  // 如果还需要继续探测,那么需要设置一个下次探测的最低带宽min_bitrate_to_probe_further_bps_
  // 如果下次低于这个带宽就不需要探测了,因为我们目前已经达到这个带宽
  if (probe_further) {
    state_ = State::kWaitingForProbingResult;
    min_bitrate_to_probe_further_bps_ =
        (*(bitrates_to_probe.end() - 1)) * config_.further_probe_threshold;
  } else {
    state_ = State::kProbingComplete;
    min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
  }
  return pending_probes;
}

开始阶段的指数级探测

在最开始阶段,我们需要指数级探测,是为了尽快达到目标带宽。探测分为两次,第一次为3倍初始码率,第二次为6倍初始码率:

std::vector<ProbeClusterConfig> ProbeController::InitiateExponentialProbing(
    int64_t at_time_ms) {
  // 初始化阶段的探测,为了尽快达到探测码率
  // 分为两次,第一次为3*start_bitrate_bps_,第二次为6*start_bitrate_bps_
  // When probing at 1.8 Mbps ( 6x 300), this represents a threshold of
  // 1.2 Mbps to continue probing.
  std::vector<int64_t> probes = {static_cast<int64_t>(
      config_.first_exponential_probe_scale * start_bitrate_bps_)};
  if (config_.second_exponential_probe_scale) {
    probes.push_back(config_.second_exponential_probe_scale.Value() *
                     start_bitrate_bps_);
  }
  return InitiateProbing(at_time_ms, probes, true);
}

设置目标码率触发探测

std::vector<ProbeClusterConfig> ProbeController::SetBitrates(
    int64_t min_bitrate_bps,
    int64_t start_bitrate_bps,
    int64_t max_bitrate_bps,
    int64_t at_time_ms) {
  if (start_bitrate_bps > 0) {
    start_bitrate_bps_ = start_bitrate_bps;
    estimated_bitrate_bps_ = start_bitrate_bps;
  } else if (start_bitrate_bps_ == 0) {
    start_bitrate_bps_ = min_bitrate_bps;
  }

  // The reason we use the variable `old_max_bitrate_pbs` is because we
  // need to set `max_bitrate_bps_` before we call InitiateProbing.
  int64_t old_max_bitrate_bps = max_bitrate_bps_;
  max_bitrate_bps_ = max_bitrate_bps;

  switch (state_) {
    // 在初始化阶段,需要指数级探测
    case State::kInit:
      if (network_available_)
        return InitiateExponentialProbing(at_time_ms);
      break;

    case State::kWaitingForProbingResult:
      break;

    // 探测结束
    case State::kProbingComplete:
      // If the new max bitrate is higher than both the old max bitrate and the
      // estimate then initiate probing.
      // 设置了一个新的码率,且当前估计的码率还未达到最大,此时探测到设置的这个码率
      if (estimated_bitrate_bps_ != 0 &&
          old_max_bitrate_bps < max_bitrate_bps_ &&
          estimated_bitrate_bps_ < max_bitrate_bps_) {
        // The assumption is that if we jump more than 20% in the bandwidth
        // estimate or if the bandwidth estimate is within 90% of the new
        // max bitrate then the probing attempt was successful.
        // 探测成功的阈值:达到上次码率的1.2倍或者设置的最大码率的0.9倍
        mid_call_probing_succcess_threshold_ =
            std::min(estimated_bitrate_bps_ * 1.2, max_bitrate_bps_ * 0.9);
        mid_call_probing_waiting_for_result_ = true;
        mid_call_probing_bitrate_bps_ = max_bitrate_bps_;

        RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.Initiated",
                                   max_bitrate_bps_ / 1000);

        // 立即探测到max_bitrate_bps_
        return InitiateProbing(at_time_ms, {max_bitrate_bps_}, false);
      }
      break;
  }
  return std::vector<ProbeClusterConfig>();
}

最大需求带宽更新

当最大需求带宽变化的时候,为了尽快达到目标带宽,可以使用probe快速达到。这里的触发探测仅在ALR状态下才会进行。探测分为两步,第一次为最大需求带宽的一倍,第二为最大需求带宽的两倍。

std::vector<ProbeClusterConfig> ProbeController::OnMaxTotalAllocatedBitrate(
    int64_t max_total_allocated_bitrate,
    int64_t at_time_ms) {
  const bool in_alr = alr_start_time_ms_.has_value();
  const bool allow_allocation_probe = in_alr;

  // 当最大需求带宽变化的时候需要触发probe
  if (state_ == State::kProbingComplete &&
      max_total_allocated_bitrate != max_total_allocated_bitrate_ &&
      estimated_bitrate_bps_ != 0 &&
      (max_bitrate_bps_ <= 0 || estimated_bitrate_bps_ < max_bitrate_bps_) &&
      estimated_bitrate_bps_ < max_total_allocated_bitrate &&
      allow_allocation_probe) {
    max_total_allocated_bitrate_ = max_total_allocated_bitrate;

    if (!config_.first_allocation_probe_scale)
      return std::vector<ProbeClusterConfig>();

    // 采用两次上探,第一次为1倍,第二次为2倍
    DataRate first_probe_rate =
        DataRate::BitsPerSec(max_total_allocated_bitrate) *
        config_.first_allocation_probe_scale.Value();
    DataRate probe_cap = config_.allocation_probe_max.Get();
    first_probe_rate = std::min(first_probe_rate, probe_cap);

    std::vector<int64_t> probes = {first_probe_rate.bps()};
    if (config_.second_allocation_probe_scale) {
      DataRate second_probe_rate =
          DataRate::BitsPerSec(max_total_allocated_bitrate) *
          config_.second_allocation_probe_scale.Value();
      second_probe_rate = std::min(second_probe_rate, probe_cap);
      if (second_probe_rate > first_probe_rate)
        probes.push_back(second_probe_rate.bps());
    }
    return InitiateProbing(at_time_ms, probes,
                           config_.allocation_allow_further_probing);
  }
  max_total_allocated_bitrate_ = max_total_allocated_bitrate;
  return std::vector<ProbeClusterConfig>();
}

网络中断/恢复

网络中断后需要直接终止未完成的probe;网络恢复后,且是未做过probe,则需要指数级探测。

std::vector<ProbeClusterConfig> ProbeController::OnNetworkAvailability(
    NetworkAvailability msg) {
  network_available_ = msg.network_available;
  // 网络中断,立即结束探测
  if (!network_available_ && state_ == State::kWaitingForProbingResult) {
    state_ = State::kProbingComplete;
    min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
  }
  // 恢复,初始化探测
  if (network_available_ && state_ == State::kInit && start_bitrate_bps_ > 0)
    return InitiateExponentialProbing(msg.at_time.ms());
  return std::vector<ProbeClusterConfig>();
}

设置估计的码率

std::vector<ProbeClusterConfig> ProbeController::SetEstimatedBitrate(
    int64_t bitrate_bps,
    int64_t at_time_ms) {
  if (mid_call_probing_waiting_for_result_ &&
      bitrate_bps >= mid_call_probing_succcess_threshold_) {
    mid_call_probing_waiting_for_result_ = false;
  }

  // 如果当前正在等待probe结果,且新设置的码率超过上次最后一次探测的0.7倍,则需要继续探测
  std::vector<ProbeClusterConfig> pending_probes;
  if (state_ == State::kWaitingForProbingResult) {
    // Continue probing if probing results indicate channel has greater
    // capacity.
    RTC_LOG(LS_INFO) << "Measured bitrate: " << bitrate_bps
                     << " Minimum to probe further: "
                     << min_bitrate_to_probe_further_bps_;

    if (min_bitrate_to_probe_further_bps_ != kExponentialProbingDisabled &&
        bitrate_bps > min_bitrate_to_probe_further_bps_) {
      pending_probes = InitiateProbing(
          at_time_ms,
          {static_cast<int64_t>(config_.further_exponential_probe_scale *
                                bitrate_bps)},
          true);
    }
  }

  // 如果当前的估计码率较低,需要记录码率降低前的估计码率
  // 记录降低前的码率主要是为了快速回复,下次不需要从最小码率开始探测,而是从这个码率开始
  if (bitrate_bps < kBitrateDropThreshold * estimated_bitrate_bps_) {
    time_of_last_large_drop_ms_ = at_time_ms;
    bitrate_before_last_large_drop_bps_ = estimated_bitrate_bps_;
  }

  estimated_bitrate_bps_ = bitrate_bps;
  return pending_probes;
}

定时任务处理ALR周期探测、结束探测

在ALR状态下可以周期地上探,以达到需求带宽。

std::vector<ProbeClusterConfig> ProbeController::Process(int64_t at_time_ms) {
  // 如果探测超过1s还没有完成,那么探测超时,结束探测
  if (at_time_ms - time_last_probing_initiated_ms_ >
      kMaxWaitingTimeForProbingResultMs) {
    mid_call_probing_waiting_for_result_ = false;

    if (state_ == State::kWaitingForProbingResult) {
      RTC_LOG(LS_INFO) << "kWaitingForProbingResult: timeout";
      state_ = State::kProbingComplete;
      min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
    }
  }

  // 如果开启ALR周期探测,那么需要开始新的一次探测
  // 按照当前估计码率的2倍来探测
  // ALR探测周期为5s
  if (enable_periodic_alr_probing_ && state_ == State::kProbingComplete) {
    // Probe bandwidth periodically when in ALR state.
    if (alr_start_time_ms_ && estimated_bitrate_bps_ > 0) {
      int64_t next_probe_time_ms =
          std::max(*alr_start_time_ms_, time_last_probing_initiated_ms_) +
          config_.alr_probing_interval->ms();
      if (at_time_ms >= next_probe_time_ms) {
        return InitiateProbing(at_time_ms,
                               {static_cast<int64_t>(estimated_bitrate_bps_ *
                                                     config_.alr_probe_scale)},
                               true);
      }
    }
  }
  return std::vector<ProbeClusterConfig>();
}

主动请求探测

在带宽降低后,快速恢复。

std::vector<ProbeClusterConfig> ProbeController::RequestProbe(
    int64_t at_time_ms) {
  // Called once we have returned to normal state after a large drop in
  // estimated bandwidth. The current response is to initiate a single probe
  // session (if not already probing) at the previous bitrate.
  //
  // If the probe session fails, the assumption is that this drop was a
  // real one from a competing flow or a network change.
  bool in_alr = alr_start_time_ms_.has_value();
  bool alr_ended_recently =
      (alr_end_time_ms_.has_value() &&
       at_time_ms - alr_end_time_ms_.value() < kAlrEndedTimeoutMs);
  if (in_alr || alr_ended_recently || in_rapid_recovery_experiment_) {
    if (state_ == State::kProbingComplete) {
      // 按照降低前的带宽0.85倍探测
      uint32_t suggested_probe_bps =
          kProbeFractionAfterDrop * bitrate_before_last_large_drop_bps_;
      // 至少需要ACK 0.95倍探测数据,因为这里的快速恢复本地就存在风险,因此ACK需要严格一些
      uint32_t min_expected_probe_result_bps =
          (1 - kProbeUncertainty) * suggested_probe_bps;

      int64_t time_since_drop_ms = at_time_ms - time_of_last_large_drop_ms_;
      int64_t time_since_probe_ms = at_time_ms - last_bwe_drop_probing_time_ms_;

      // 距离上次带宽降低在5s内,距离上次ALR探测超过5s
      // 需要ACK的码率超过当前的估计码率,否则就没必要开始这次探测
      // 满足上述条件后,就可以以建议的带宽探测。
      if (min_expected_probe_result_bps > estimated_bitrate_bps_ &&
          time_since_drop_ms < kBitrateDropTimeoutMs &&
          time_since_probe_ms > kMinTimeBetweenAlrProbesMs) {
        last_bwe_drop_probing_time_ms_ = at_time_ms;
        return InitiateProbing(at_time_ms, {suggested_probe_bps}, false);
      }
    }
  }
  return std::vector<ProbeClusterConfig>();
}

3. 总结

probe控制主要就是根据各种状态决策是否需要probe,probe的码率等。