原文出处:基于tcmalloc的高并发内存池

内存池

池是在计算机技术中经常使用的一种设计模式,其内涵在于:将程序中需要经常使用的核心资源先申请出来,放到一个池内,由程序自己管理,这样可以提高资源的使用效率,也可以保证本程序占有的资源数量。经常使用的池技术包括内存池、线程池和连接池等,其中尤以内存池和线程 池使用最多。

内存池(Memory Pool) 是一种动态内存分配与管理技术。通常情况下,开发者习惯直接使 用new、delete、malloc、free等API申请分配和释放内存,这样导致的后果是:当程序长时间运行时,由于所申请内存块的大小不定,频繁使用时会造成大量的内存碎片从而降低程序和操作系统的性能。内存池则是在真正使用内存之前,先申请分配一大块内存(内存池)留作备用,当申请内存时,从池中取出一块动态分配,当释放内存时,将释放的内存再放入池内,再次申请池可以再取出来使用,并尽量与周边的空闲内存块合并。若内存池不够时,则自动扩大内存池,从操作系统中申请更大的内存池。

内存池主要解决的问题

1.内存碎片问题

内存碎片问题是指系统中存在大量不连续的、无法被有效利用的内存空间。这些不连续的内存块虽然总和能够满足内存需求,但由于它们被分割成了许多小块,导致系统无法为较大的内存请求提供连续的内存空间,从而降低了内存利用率和系统性能。

图片

假设系统依次分配了16byte、8byte、16byte、4byte,还剩余8byte未分配。这时要分配一个24byte的空间,操作系统回收了一个上面的两个16byte,总的剩余空间有40byte,但是却不能分配出一个连续24byte的空间,这就是内存碎片问题。

内存碎片主要分为两种:

内存碎片带来的影响:

使用内存池可以减少内存碎片问题。

2.申请效率问题

它通过预先分配一定数量的内存块,并在需要时从这些预分配的内存块中获取内存,可以提高内存申请的效率,减少内存申请和释放的开销。

有了内存池后,当动态申请内存时,直接从内存池中获取一个预分配的内存块,避免了动态内存分配的开销,提高了内存申请的效率,不再需要操作系统在内存频繁的申请内存了。

实现一个定长内存池

定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。

定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”。因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。

定长内存池所需要的成员变量

对于定长内存池肯定需要先向系统申请一大块内存,然后对其进行管理。还需要知道剩余内存大小信息。对于已经释放的内存还需要管理再次使用。

对于已经释放的内存使用自由链表进行管理。

图解:

图片

图片

类图:

图片

将维护内存池的指针设置为char*,这样申请获得申请内存对象的大小后,直接对其进行加法操作即可。因为指针加整数的操作是根据指针类型来计算新的地址的。比如int* + 8,会让指针加上8个int大小的地址。即移动了32个字节。char* + 8 就只会移动8个字节。char* 方便对内存池进程切割

管理已释放内存

对于还给内存池的内存块,使用自由链表进行管理,自由链表维护了一个列表,其中每个节点代表一个可用的内存块,并且这些内存块可能是连续的或者分散的。自由链表的基本思想是,当需要分配内存时,可以从自由链表中选择一个可用的内存块分配给请求者;当释放内存时,将被释放的内存块添加到自由链表中,以便之后的内存分配请求可以使用。

同时也不需要对其设计专门的链式结构,让内存块的前4个字节(32位下)或者前8个字节(64位下)存储下一个内存块的地址即可。当有新的内存块释放时,对自由链表进行头插即可。

图片

注意:以32位平台为例:可能申请的对象不足4字节,在申请空间时,如果小于4字节,就多开一点空间最少开4字节,为了自由链表做牺牲。

定长内存池的实现

template<class T>
class ObjectPool
{
public:
  T* New() {
    T* obj = nullptr;
    //先使用已经释放的内存块
    if (_free_list != nullptr) {
      void* next = *((void**)_free_list);
      obj = (T*)_free_list;
      _free_list = next;
    } else {
      //剩余内存不够一个对象大小时,重新开空间
      if (_remain_bytes < sizeof(T)) {
        _remain_bytes = 128 * 1024;
        _memory = (char*)malloc(_remain_bytes);
        if (_memory == nullptr) {
          printf("malloc fail");
          exit(-1);
        }
      }

      obj = (T*)_memory;//类型匹配
      size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//保证对象能够存下指针的大小 比指针还小就多开点
      _memory += obj_size;//char* 用了多少直接加
      _remain_bytes -= obj_size;

    }
    //定位new在已分配的内存块上使用定位 new 构造对象
    new(obj)T;
    return obj;
  }

  void Delete(T* obj) {
    //显示调用该对象的析构函数清理该对象 防止该来中有动态资源没有释放造成内存泄漏。
    obj->~T();
    //自由链表头插
    *(void**)obj = _free_list;
    _free_list = obj;
  }

private:
  char* _memory = nullptr;//维护大的内存池指针
  size_t _remain_bytes = 0;//剩余内存字节数
  void* _free_list = nullptr;//管理释放回来的自由链表头指针
};

解析:

先不考虑有已经释放内存块的情况,图解整个为对象动态开辟空间的过程。

首先内存池会申请128kb大小的内存池,让对象obj指向内存池的起始地址。

图片

确定对象大小,如果不足地址的大小,就多开一些,为后面自由链表做牺牲。假设对象大小是32字节。维护内存池的_memory指针就会向后移动32字节,同时剩余地址字节数也会更新。(后面在申请时,根据内存池剩字节数和对象大小来判断是否需要重新开空间)

图片

然后通过定位new,在已分配的内存块上构造对象。最后返回对象的指针即可。到此不考虑有以释放内存的动态开辟空间的过程就完了。

释放已申请空间,释放已申请空间就是将内存块头插到自由链表的过程。需要注意的是,将内存块的头四个字节来存储下一个内存块的地址,存放是有技巧的。首先在申请的时候,如果对象不足一个地址的大小,就会按照一个地址的大小来开辟,32位下指针4个字节,64位下,指针8个字节。开空间很好开,但是要拿到对象的头四个字节或者八个字节是需要解引用操作的。指针的类型决定了指针解引用的大小。这样,32位平台和64位平台下,不使用二级指针,就要做条件判断。使用二级指针,就能很好的解决这个问题。

void**一个二级指针,在32位下,他的大小是4字节,64位下是8字节。因为指针大小不论类型,只有在不同的平台下才有差距。

但是如果对void**进行解引用操作,即*(void**),在不同的平台下就是不同的大小,32位下,void* 指针的大小是4个字节,而 void** 是指向 void* 指针的指针,因此其大小也是4个字节。当对 void** 进行解引用操作时就会访问void*大小字节数,也就是4个字节。在64位平台下,对其进行解引用操作,也会访问void*大小字节数,就是8个字节。因为指针解引用取决于指针指向数据类型的大小,比如一个指针指向int类型的数据,对其进行解引用就会访问4字节大小的地址。

当然不止是void**可以,只要是个二级指针就可以做到在32位平台下访问头四个字节,在64位下访问头8个字节。

拿到内存块对象的头4或8个字节后,就是自由链表头插的过程了。

自由链表头插:(32位下为例)

如果是第一次头插,_free_list == nullptr 让内存块对象的头四字节指向_free_list,再让_free_list指向内存块。

图片

如果不是第一次头插,也可以让新的内存块头四字节指向_free_list,再让_free_list指向新的内存块。

图片

这样就不用做特殊情况处理了,每次头插都可以使用同样的逻辑处理。

*(void**)obj = _free_list;
_free_list = obj;

以上就是整个内存释放的过程。

当自由链表不为空,说明下次为对象动态开辟空间时,就会先使用自由链表中的内存块。

void* next = *((void**)_free_list);
obj = (T*)_free_list;
_free_list = next;

自由链表中头四个字节存放的是下一个内存块的地址,先保存起来,在让obj指向自由链表,从自由链表中定位new即可(还是从内存池中申请空间,这里只是抽象了一下。从内存池中以释放的地址中进行申请)。最后更新一下自由链表即可。

图片

性能对比

性能测试要在release下进行测试,debug下会关闭优化,且可能会启用额外的运行时检查(如边界检查、空指针检查等)。

void test()
{
  // 申请释放的轮次
  const size_t Rounds = 5;

  // 每轮申请释放多少次
  const size_t N = 10000000;

  //使用new和delete
  std::vector<int*> v1;
  v1.reserve(N);
  size_t begin1 = clock();
  for (int i = 0; i < Rounds; ++i)
  {
    for (int j = 0; j < N; ++j)
    {
      v1.push_back(new int());
    }
    for (int j = 0; j < N; ++j)
    {
      delete v1[j];
    }  
    v1.clear();
  }
  size_t end1 = clock();
  //使用内存池
  std::vector<int*> v2;
  v2.reserve(N);
  ObjectPool<int> obj_pool;
  size_t begin2 = clock();
  for (int i = 0; i < Rounds; ++i)
  {
    for (int j = 0; j < N; ++j)
    {
      v2.push_back(obj_pool.New());
    }
    for (int j = 0; j < N; ++j)
    {
      obj_pool.Delete(v2[j]);
    }
    v2.clear();
  }
  size_t end2 = clock();

  cout << "USE NEW TIME = :" << end1 - begin1 << endl;
  cout << "USE OBJECTPOOL TIME = :" << end2 - begin2 << endl;
}

运行结果:

图片

对于一千万内置类型数据,申请释放五次,使用内存池会快27倍的。非常的恐怖。

自定义一个二叉树节点类

struct TreeNode
{
  int _val;
  TreeNode* _left;
  TreeNode* _right;

  TreeNode()
    :_val(0)
    , _left(nullptr)
    , _right(nullptr)
  {}
};

对其进行测试

void test()
{
  // 申请释放的轮次
  const size_t Rounds = 5;

  // 每轮申请释放多少次
  const size_t N = 10000000;

  //使用new和delete
  std::vector<TreeNode*> v1;
  v1.reserve(N);
  size_t begin1 = clock();
  for (int i = 0; i < Rounds; ++i)
  {
    for (int j = 0; j < N; ++j)
    {
      v1.push_back(new TreeNode());
    }
    for (int j = 0; j < N; ++j)
    {
      delete v1[j];
    }  
    v1.clear();
  }
  size_t end1 = clock();
  //使用内存池
  std::vector<TreeNode*> v2;
  v2.reserve(N);
  ObjectPool<TreeNode> obj_pool;
  size_t begin2 = clock();
  for (int i = 0; i < Rounds; ++i)
  {
    for (int j = 0; j < N; ++j)
    {
      v2.push_back(obj_pool.New());
    }
    for (int j = 0; j < N; ++j)
    {
      obj_pool.Delete(v2[j]);
    }
    v2.clear();
  }
  size_t end2 = clock();

  cout << "USE NEW TIME = :" << end1 - begin1 << endl;
  cout << "USE OBJECTPOOL TIME = :" << end2 - begin2 << endl;
}

运行结果:

图片

同样对于一千万自定义类型数据,进行五轮申请释放,使用内存池会快将近18倍。

这里内存池和new底层都是使用malloc进行的

图片

malloc函数在内部可能会实现一些简单的内存池管理策略。可以不通过malloc直接使用系统调用接口向堆区申请空间。在windows下使用VirtualAlloc系统调用接口,用于在进程的虚拟地址空间中分配内存。它可以用于分配一块指定大小的内存,并返回指向这块内存的指针。在Linux下,可以调用brk或mmap函数。

#ifdef _WIN32
#include <Windows.h>
#else
//...Linux
#endif

//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
  void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
  // linux下brk mmap等
        void* ptr = mmap(0, kpage << 13, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
#endif
  if (ptr == nullptr)
  {
    printf("malloc fail");
    exit(-1);
  }
  return ptr;
}

在内存池new的时候调用SystemAlloc进行申请

_memory = (char*)SystemAlloc(_remain_bytes >> 13);

这样,动态申请内存的时候就完全脱离malloc了。VirtualAlloc 函数在 Windows 系统中以页面(page)为单位进行内存申请。页面大小是操作系统的内存管理单位,通常是 8KB。128*1024字节,右移13位就是除以8kb。申请的时候申请16页即可。

高并发内存池(concurrent memory pools)

在高并发多线程的环境下,在申请内存的时,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池。

对比上面的定长内存池,高并发内存池不仅要解决内存碎片问题和效率问题,还要解决多线程环境下,锁竞争的问题。

对比malloc,malloc本身已经很优秀了,它在性能方面可能不够高效,特别是在多线程环境下,可能存在锁竞争等问题。tcmalloc(Thread-Caching Malloc)是由 Google 开发的一种针对多线程应用的高性能内存分配器。它采用了线程本地缓存技术,减少了线程间的竞争,提高了性能。

tcmalloc 在空间利用率方面可能比 malloc 更好。它有更好的内存回收策略和碎片整理机制,可以减少内存碎片化,提高内存利用率。

Concurrent Memory Pools主要由3个部分组成

1.线程缓存 (thread cache):

thread cache 其主要思想是为每个线程维护一个本地的内存缓存,以减少多线程并发操作时的锁竞争,提高内存分配的性能。thread cache 是每个线程独有的,用于小于256kb的内存分配,线程从这里申请内存不需要加锁,每个线程独享一个thread cache。这也是并发内存池高效的一个地方。

2.中心缓存(central cache):

中心缓存是所有线程共享的,thread cache 按需从central cache中获取对象。central cache周期性的回收thread cache的对象。避免一个线程占用太多的内存,导致其他线程内存吃紧。做到内存分配在多线程中更加均衡的按需调度的目的。central cache是存在竞争的,从这里取内存对象是需要加锁保护的。

3.页缓存(page cache):

页缓存是在central cache上面的一层缓存,存储的内存是以页为单位存储以及分配的。central cache没有内存时,从page cache分配出一定数量的page,并切割成定长大小的小块内存块,分配给central cache。page cache没有足够的内存时,就会向堆区申请。

图片

下面就一一实现各层,看看是如何将性能发挥到极致的。

Thread Cache

定长内存池只支持固定大小对象内存的申请和释放,在定长内存池中只需要一个自由链表即可管理。在高并发内存池中,要支持不同大小对象内存的申请和释放,这就需要多个自由链表进行管理。thread cache 本质是一个哈希桶,每个桶都是大小不同的自由链表。逻辑图如下:

图片

Thread Cache支持小于256Kb内存申请。最小支持8字节的内存申请(因为64位下,回收回来的内存要使用自由链表管理,自由链表使用头部地址存放一下个内存块的地址,64位下,一个地址至少8字节)。

如果从8字节开始到256Kb,每个大小都挂一个自由链表,至少需要256*1024 - 8个自由链表,也就是26万多个桶。仅只是保存这些自由链表,至少都需要256kb空间。释放的时候,自由链表下在存放一些内存块,一个Thread Cache对象都占用很大的空间了。而Thread Cache在多线程环境下是每个线程独享的,创建线程的时候就会分配Thread Cache对象。一个进程,多创建几个线程,仅仅维护Thread Cache对象都需要很大的内存空间了。

解决这个问题,参考tcmalloc采用了特定对齐规则。让这些字节数进行向上对齐,比如申请1个字节时,会直接申请8个字节,剩余的可以七字节不用。申请9字节时,会直接给16字节。

对齐规则并不是256KB内的都按照8Byte进行对齐,如果都按照8Byte对齐,256Kb则需要256*1024 / 8 = 32768个桶,还是比较多。让不同范围的按照不同的对齐规则进行对齐,对齐规则如下:

字节数

对齐数

自由链表下标

[1-128]

8 Byte

[0,16)

[128+1,1024]

16 Byte

[16,72)

[ 1024 + 1 , 8 * 1024]

128 Byte

[72,128)

[8* 1024 + 1, 64 * 1024]

1024 Byte

[128,184)

[64*1024+1,256 * 1024 ]

8*1024 Byte

[184,208)

使用上面对齐规则,桶的个数控制在了208个,也就是自由链表的下标是[0-207]。逻辑图如下:

图片

采用对齐规则后,一定会带来内碎片的问题。使用好的对齐规则,可以减少空间浪费率。空间浪费率:

图片

使用上面的对齐规则后,根据公式浪费率最大就是分子最大且分母最小的时候。

不考虑第一个区间,因为第一个区间的浪费率太大了。这里浪费率从第二区间开始。按照上面的对齐规则,空间浪费率最低。

采用上面的内存对齐规则,可以控制空间浪费率到10%左右。tcmalloc实际的对齐规则比这还要复杂很多,但是基本原理是这样的,就是让浪费率最低即可。

实现一个类,专门计算向上对齐规则。这里参考了tcmalloc的计算规则。

class SizeClass
{
public:  
  static const size_t MAX_BYTES = 256 * 1024;

  // tcmalloc库的写法 返回实际开辟字节数
  static inline size_t _RoundUp(size_t bytes, size_t align)
  {
    return ((bytes + align - 1) & ~(align - 1));
  }

  //根据对象大小计算对齐规则   
  static inline size_t RoundUp(size_t bytes)
  {
    assert(bytes <= MAX_BYTES);
    if (bytes <= 128)
    {
      return _RoundUp(bytes, 8);//按照8字节对齐
    }
    else if (bytes <= 1024)
    {
      return _RoundUp(bytes, 16);//按照16字节对齐
    }
    else if (bytes <= 8 * 1024)
    {
      return _RoundUp(bytes, 128);//按照128字节对齐
    }
    else if (bytes <= 64 * 1024)
    {
      return _RoundUp(bytes, 1024);//按照1024字节对齐
    }
    else if (bytes <= 256 * 1024)
    {
      return _RoundUp(bytes, 8*1024);//按照8*1024字节对齐
    }
    else
    {
      assert(false);
      return -1;
    }
  }
};

Thread Cache的实现

ThreadCache 核心就是自由链表 这里封装一个自由链表,做为Thread Cache类的成员。

class FreeList
{
public:
  //头插
  void Push(void* obj)
  {
    *(void**)obj = _free_list;
    _free_list = obj;

    ++_size;
  }

  //头插一段链表
  void PushRound(void * start,void* end,size_t size)
  {
    *(void**)end = _free_list;
    _free_list = start;

    _size += size;
  }

  //头删
  void*Pop()
  {
    void* next = *((void**)_free_list);
    void* obj = _free_list;
    _free_list = next;

    --_size;
    return obj;
  }   

  //头删一段链表
  void PopRound(void*& start,void*& end,size_t size)
  {
    start = _free_list;
    end = start;
    for (size_t i = 0; i < size - 1; ++i)
    {
      end = NextObj(end);
    }
    _free_list = NextObj(end);

    _size -= size;
    NextObj(end) = nullptr;
  }

  bool IsEmpty()
  {
    return _free_list == nullptr;
  }

  //获取当前链表最大个数
  size_t& MaxSize()
  {
    return _max_size;
  }

  //获取当前链表元素个数
  size_t Size()
  {
    return _size;
  }

private:
  void* _free_list = nullptr;
  size_t _max_size = 1;
  size_t _size = 0;
};

Thread Cache类的声明。

#pragma once
#include "Common.h"

class ThreadCache
{
public:
  //申请内存对象
  void* Allocate(size_t size);
  //释放内存对象
  void Deallocate(void* ptr,size_t size);
  // 从中心缓存获取对象
  void* FetchFromCentralCache(size_t index, size_t size);
  // 释放对象时,链表过长时,回收内存回到中心缓存
  void ListTooLong(FreeList& list, size_t size);
private:
  FreeList _free_list[NFREELISTS];//管理切分好的自由链表(哈希桶)
};

//使用 __declspec(thread) 声明的变量将为每个线程创建一个独立的副本
//每个线程都可以在其独立的存储空间中访问该变量,而不会影响其他线程的副本。
//让这个指针维护thread cache进行内存时申请和释放
static __declspec(thread) ThreadCache* tls_threadcache = nullptr;

其中ThreadCache的属性有:

ThreadCache提供的方法有:

ThreadCache支持无锁并发访问

小于256kb的内存申请直接从ThreadCache中进行申请(如果有)。一般进程申请空间的时候,小块内存的申请更加频繁,几乎很少申请一大块空间。

多线程环境下,多个线程向内存池申请内存时,不加锁和加锁的效率是两个等级的。Thread Cache在设计的时候,就是让每一个线程独享一个Thread Cache对象,这样,在申请小对象的时候,可以有效的提高效率。

让ThreadCache支持无锁并发访问,就是让每个线程独立一个ThreadCache对象。需要用到线程局部存储TLS(Thread Local Storage)机制。

TLS机制:

TLS对象声明

static __declspec(thread) ThreadCache* tls_threadcache = nullptr;

TLS使用方法

使用new tls_threadcache = new ThreadCache();

脱离new,直接使用上面实现的定长内存池进行申请

static ObjectPool<ThreadCache> thread_cache_pool;
tls_threadcache = thread_cache_pool.New();

Thread Cache申请内存

Thread Cache申请内存通过Allocate函数进行申请。实现Allocate函数。

void* ThreadCache::Allocate(size_t size)
{
  assert(size <= MAX_BYTES);
  //确定桶的位置
  size_t index = SizeClass::Index(size);
  //确定对齐规则
  size_t align_size = SizeClass::RoundUp(size);
  //eg 申请127字节  按照8字节对齐,实际align_size = 128, index = 15。
  if (_free_list[index].IsEmpty())//空就去中心缓存获取 暂时不考虑大于256kb的情况
  {
    //中心缓存一次可能会多申请空间,只返回当前对齐数的大小,剩余的存到thread cache的自由链表中
    return FetchFromCentralCache(index, align_size);
  }
  else
  {
    return _free_list[index].Pop();
  }
}

从Central Cache获取对象时,也是有对应的策略的。Central Cache 是多线程共享的,频繁的申请要加锁,会有锁的竞争,会降低效率的。所以,参考tcmalloc,使用一定的反馈算法来一次批量的申请内存。

比如频繁的从Central Cache 中申请8Kb大小的内存时,会根据慢反馈调节算法,一次批量的申请多个大小的内存块。

比如第一次申请8Kb,申请一个,第二次申请时,申请两个,返回一个,多申请的交给自由链表管理,下次再申请8kb的时候,Thread Cache的自由链表就有内存了,不用再去向Central Cache申请了。以此类推,当然,这个算法还要定一个上限,不能一下申请好几万个交给自由链表管理。上限规定,一次最多申请512个内存块,申请太多会成负担的。【这个机制和网络中的tcp协议的慢启动 机制类似】

static size_t NumMoveSize(size_t size)
{
    assert(size > 0);

    // [2, 512],一次批量移动多少个对象的(慢启动)上限值
    // 小对象一次批量上限高
    // 小对象一次批量上限低
    int num = MAX_BYTES / size;
    if (num < 2)
            num = 2;

    if (num > 512)
            num = 512;

    return num;
}

根据慢反馈调节算法,决定了一次从Central Cache申请多少内存。如果申请到1个的话,就直接返回给Allocate即可,申请到多个,将剩余的一部分插入到自由链表中(自由链表提供PushRound()函数,可以一次插入一段链表),将第一个内存块返回给Allocate即可。

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
  // 慢开始反馈调节算法
  //NumMoveSize决定上限  
  size_t batch_num = min(_free_list[index].MaxSize(),SizeClass::NumMoveSize(size));
  if (batch_num == _free_list[index].MaxSize())
  {
    _free_list[index].MaxSize()+=1;
  }

  // 从中心缓存中获取一段内存 可能会多申请 多申请的放到thread cache即可
  void* start = nullptr;
  void* end = nullptr;
  size_t actual_num = CentralCache::GetInstace()->FetchRangeObj(start, end, batch_num, size);
  assert(actual_num >= 1);//至少申请一个
  if (actual_num == 1)
  {
    return start;
  }
  else
  {
    _free_list[index].PushRound(NextObj(start), end, actual_num -1 );
    return start;
  }
}

Thread Cache释放内存

Thread Cache申请内存通过Deallocate函数进行申请。实现Deallocate函数。

void ThreadCache::Deallocate(void* ptr, size_t size)
{
  //根据大小确定桶的位置
  size_t index = SizeClass::Index(size);
  _free_list[index].Push(ptr);
  //自由链表超过批量申请的大小  还给Cetral Cache
  if (_free_list[index].Size() >= _free_list[index].MaxSize())
  {
    ListTooLong(_free_list[index], size);
  }
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
  //取一段内存 给Central Cache
  void* start = nullptr;
  void* end = nullptr;
  list.PopRound(start, end, list.MaxSize());
  CentralCache::GetInstace()->ReleaseListToSpans(start, size);
}

Central Cache

Central Cache设计框架

图片

Central Cache主要为Thread Cache提供服务,哈希映射规则采用的是按字节进行映射。经过一定的规则对齐后的映射。

central cache的结构与thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。

Cecntral Cache的结构和Thread Cache是一样的,但是,Thread Cache是每一个线程独享的,Central Cache是多线程共享的,在整个内存池中只有一个Central Cache,再实现的时候,将其设计成为单例模式更为合理。

每个线程的Thread Cache对象没有内存时都会向Central Cache进行申请,多线程环境下,要注意加锁问题。Central Cache在加锁的时候,并不是将整个Central Cache都加上锁,可以设计成为桶锁。只有当多线程同时从一个桶去申请内存时,才会有互斥的行为。比如线程1申请8Byte内存,线程2申请256kb内存,俩个线程不是从同一个桶中申请内存的,这种情况这两个线程就不用存在互斥关系,可以并发同时进行申请。这也是内存池高效的原因之一。

Thead Cache的每一个桶都是对应大小的自由链表,而Central Cache的桶则是一个一个的span对象。sapn管理的是以页为单位的大块内存块。用双向带头循环链表的方式将这些span管理起来。而每个span还有自由链表对象。

每个进程都有自己的进程地址空间,32位下,进程地址空间最大是2^32,也就是4Gb。而64位下,进程地址空间最大是2^64。这些地址空间,按页为单位进行划分,一页普遍是8kb。64位下,一共有2^64 / 2^13 = 2^51页。管理页号这里,32位下和64位下,使用不同的类型记录。32位下,size_t类型完全足够,但是64位下,size_t 类型远远不够。64位下用unsigned long long类型保存(表示无符号的 64 位整数)。

存储页号,使用条件编译解决。当然,实际上不可能有这么多的页号,进程地址空间的地址是虚拟的,实际上没有这么多,64位下,2^64字节,16TB的地址空间大小,一般计算机很难有这个水平的,大公司的服务器一般才有这个配置。虚拟地址就是每个进程都“以为”自己能有16tb,但是一般普通的计算机实际根本到不了。还是根据实际的物理内存来的。尽管是虚拟的,也要按照最大的来进行维护页号。

#ifndef _WIN64
  typedef unsigned long long PAGE_ID;
#elif _WIN32
  typedef size_t PAGE_ID;
#endif // !_WIN64

sapn

sapn描述是以页为单位的大块内存的

//管理以页为单位的大块内存
struct Span
{
  PAGE_ID _page_id = 0; //页号
  size_t _page_num = 0;//页的数量

  Span* _prev = nullptr;
  Span* _next = nullptr;

  size_t _obj_size = 0;//对象大小 
  size_t _use_count = 0;
  void* _free_list = nullptr;
  bool _is_use = false;//span对象是否被使用
};

span描述的以页为单位的大块内存,我们需要知道这块内存具体在哪一个位置,便于之后page cache进行前后页的合并,因此span结构当中会记录所管理大块内存起始页的页号。

至于每一个span管理的到底是多少个页,这并不是固定的,需要根据多方面的因素来控制,因此span结构当中有一个_page_num成员,该成员就代表着该span管理的页的数量。

span中对象大小属性,在释放的时候,根据地址获取页号,再根据页号获取对应的span,从sapn中获取到对象的大小,再释放的时候直接根据地址就可以确定大小了,不用再释放的时候也传对象大小。

每个span管理的大块内存,都会被切成相应大小的内存块挂到当前span的自由链表中,比如8Byte哈希桶中的span,会被切成一个个8Byte大小的内存块挂到当前span的自由链表中,因此span结构中需要存储切好的小块内存的自由链表_free_list。

在Page Cache中会进行合并操作,需要记录一下当前span是否正在被使用,如果正在使用就无法进行合并操作。

将这些sapn对象以循环双链表组织起来,做为Central Cache的成员变量。

class SpanList
{
public:
  SpanList()
  {
    _head = new Span;
    _head->_next = _head;
    _head->_prev = _head;
  }

  Span* Begin()
  {
    return _head->_next;
  }

  Span* End()
  {
    return _head;
  }

  void Insert(Span* cur, Span* newspan)
  {
    assert(cur != _head);
    assert(newspan != nullptr);

    Span* prev = cur->_prev;
    prev->_next = newspan;
    newspan->_prev = prev;
    newspan->_next = cur;
    cur->_prev = newspan;
  }

  void Erase(Span* cur)
  {
    assert(cur != nullptr);
    assert(cur != _head);

    Span* prev = cur->_prev;
    Span* next = cur->_next;
    prev->_next = next;
    next->_prev = prev;
  }

  bool Empty()
  {
    return _head->_next == _head;
  }

  void PushFront(Span* newspan)
  {
    assert(newspan != nullptr);
    Insert(Begin(), newspan);
  }

  Span* PopFront()
  {
    Span* front = _head->_next;
    Erase(front);
    return front;
  }

private:
  Span* _head = nullptr;
public:
  std::mutex _bucket_lock;//桶锁
};

Central Cache 实现

Central Cache在整个进程中只能存在一份,将其设计成为单例。

#pragma once
#include "Common.h"
//单例
class CentralCache
{
public:
  static CentralCache* GetInstace()
  {
    return &_inst;
  }

  CentralCache(const CentralCache&) = delete;
  CentralCache& operator=(const CentralCache) = delete;

  // 从中心缓存获取一定数量的对象给thread cache
  size_t FetchRangeObj(void*& start, void*& end, size_t num, size_t size);

  // 获取一个非空的span
  Span* GetOneSpan(SpanList& list, size_t byte_size);

  // 将一定数量的对象释放到span对象
  void ReleaseListToSpans(void* start, size_t byte_size);

private:
  CentralCache() {}
  SpanList _span_lists[NFREELISTS]; // 按对齐方式映射
  static CentralCache _inst;//静态成员变量 一定要在源文件中定义 这里只是声明
};

Central Cache类主要的成员就是_span_lists双向循环链表。

FetchRangeObj实现

Central Cache申请内存

size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batch_num, size_t size)
{
  //中心缓存和thread cache的哈希结构是一样的使用同样的方式确定下标
  size_t index = SizeClass::Index(size);
  //加桶锁
  _span_lists[index]._bucket_lock.lock();
  //从哈希桶中获取span对象(还是加锁的)
  Span* span = GetOneSpan(_span_lists[index], size);
  assert(span != nullptr);
  assert(span->_free_list != nullptr);

  //从Span中获取num个内存块
  //span中可能不够num个 有多少申请多少 
  //span可能很多 最多要batch_num个(thread cache一次批量申请的个数)
  start = span->_free_list;
  end = start;
  size_t i = 0;
  size_t actual_num = 1;
  while (NextObj(end)!= nullptr  && (i < batch_num - 1))
  {
    end = NextObj(end);
    actual_num++;
    ++i;
  }
  //更新span中的free_list 和use_count
  span->_free_list = NextObj(end);
  NextObj(end) = nullptr;
  span->_use_count += actual_num;
  _span_lists[index]._bucket_lock.unlock();  

  return actual_num;
}

注意: 从哈希桶中申请sapn对象时就要加桶锁了。

GetOneSpan实现

GetOnespan主要的功能是从Central Cache中的哈希桶中获取对应的span对象,如果Central Cache的哈希桶中有,直接返回即可,如果没有就需要从Page Cache中获取span对象了, 通过Page Cache的NewSpan接口获取(这个接口在Page Cache会详细介绍)。

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
  //遍历SpanList 查找未使用的Span
  Span* iter = list.Begin();
  while (iter != list.End())
  {
    if (iter->_free_list != nullptr)
    {
      return iter;
    }
    else
    {
      iter = iter->_next;
    }
  }
  //从pagecache中获取内存时先解锁,其他线程释放内存的时候不受影响
  list._bucket_lock.unlock(); 

  //没有Span对象 去Page Cache中申请  从page cache中申请时要加锁。
  PageCache::GetInstance()->_page_mtx.lock();
  Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
  span->_is_use = true;
  span->_obj_size = size;
  PageCache::GetInstance()->_page_mtx.unlock();

  //切分的时候不用加锁  
  //对span进行切分 需要知道大块内存的起始地址和大块内存的字节数
  char* start = (char*)(span->_page_id << PAGE_SHIFT);//页号*8kb就是起始地址
  size_t byte_size = span->_page_num << PAGE_SHIFT;//页的数量*8kb就是字节数
  char* end = start + byte_size;
  //将大块内存切分成小块内存挂到自由链表下面
  //尾插 头插的话地址不连续 是反的
  span->_free_list = start;
  start += size;
  void* tail = span->_free_list;
  while (start < end)
  {
    NextObj(tail) = start;
    tail = start;
    start += size;
  }
  NextObj(tail) = nullptr;

  //将切好的span插入到spanlist中 (要加桶锁) 恢复锁 FetchRangeObj 会解锁
  list._bucket_lock.lock();
  list.PushFront(span);
  return span;
}

如果Central Cache中有span对象,那么直接返回给FetchRangeObj即可。如果没有,从Page Cache中获取到span对象,就需要注意以下事情:

  1. 从Page Cache获取span对象的时候,应该把桶锁先解除了。因为Central Cache是多线程共享的临界资源。比如线程1申请8kb,内存时,Thread Cache没有,就需要加桶锁向Central Cache中申请。如果此时线程2正好释放8kb内存时,由于线程2的8kb自由链表太长,需要还给Central Cache。遇到这种情况时,如果线程1不解锁,线程2释放内存时就会阻塞挂起,理论上线程2不应该被挂起。所以再向Page Cahce申请内存时,应该先解锁。等从Page Cache申请完内存后,把申请到的span需要插入到SpanList时再把锁恢复了。

  2. 从Page Cache申请到span对象后,需要把sapn管理的大块内存进行切分,切分成为申请对象大小对应的大小内存块插入到该span的自由链表下(说的比较别扭一点了,简单说就是比如申请8kb时,把span管理的大块内存块切分成8kb大小的内存块挂到自由链表下)。

具体得切分逻辑如下:

进行切分得时候不用加锁,因为其他线程不会执行到这得。

图片

//切分的时候不用加锁  
//对span进行切分 需要知道大块内存的起始地址和大块内存的字节数
char* start = (char*)(span->_page_id << PAGE_SHIFT);//页号*8kb就是起始地址
size_t byte_size = span->_page_num << PAGE_SHIFT;//页的数量*8kb就是字节数
char* end = start + byte_size;
//将大块内存切分成小块内存挂到自由链表下面
//尾插 头插的话地址不连续 是反的
span->_free_list = start;
start += size;
void* tail = span->_free_list;
while (start < end)
{
        NextObj(tail) = start;
        tail = start;
        start += size;
}
NextObj(tail) = nullptr;

ReleaseListToSpans实现

ReleaseListToSpans主要功能是把Thread Cache回收回来得内存存放到对应span对象下得自由链表中。

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
  size_t index = SizeClass::Index(size);
  _span_lists[index]._bucket_lock.lock();
  while (start != nullptr)
  {
    //把内存块挂到对应的span对象中 需要知道内存块和span的映射关系
    Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
    void* next = NextObj(start);
    //头插到sapn下的freelist中
    *(void**)start = span->_free_list;
    span->_free_list = start;
    span->_use_count--;
    //sapn下的所有小块内存都回来了 还给pagecache 进行合并 解决内存碎片问题
    if (span->_use_count == 0)
    {
      _span_lists[index].Erase(span);
      span->_free_list = nullptr;
      span->_next = span->_prev = nullptr;

      _span_lists[index]._bucket_lock.unlock();
      PageCache::GetInstance()->_page_mtx.lock();
      PageCache::GetInstance()->ReleaseSpanToPageCache(span);
      PageCache::GetInstance()->_page_mtx.unlock();
      _span_lists[index]._bucket_lock.lock();//恢复锁
    }
    start = next;
  }
  _span_lists[index]._bucket_lock.unlock();
}

Page Cache

Page Cache 设计框架:

图片

Page Cache是一个哈希桶的结构。每个桶下面都是一个个span对象。Page Cache主要向Central Cache提供服务,直接管理SpanList向Central Cache提供span对象即可。

整个Page Cache又是以页为单位来进行映射的。比如1页的映射到0号桶。128页映射到127号桶。这样映射的时候需要注意下标位置。可以多开一个空间,0-129号下标,第一个桶不用,让1页的映射到1号桶,2页的射到2号桶,依次类推。更加简单。

当前内存池支持单个对象最大是256Kb,128页刚好可以被切分成为4个256Kb对象,所以128页完全足够.

Page Cache实现

整个进程中只能存在一个Page Cache,将Page Cache设计成为单例。

#pragma once
#include "Common.h"
#include "ObjectPool.h"
//单例
class PageCache
{
public:
  PageCache(const PageCache&) = delete;
  PageCache& operator=(const PageCache) = delete;

  //获取单例对象
  static PageCache* GetInstance()
  {
    return &_inst;
  }

  //获取size页的Span给中心缓存
  Span* NewSpan(size_t size);

  // 获取从对象到span的映射
  Span* MapObjectToSpan(void* obj);

  // 释放空闲span回到Pagecache,并合并相邻的span
  void ReleaseSpanToPageCache(Span* span);
  std::mutex _page_mtx;
private:
  PageCache() {}
  static PageCache _inst;//只是声明
  SpanList _span_lsit[PAGE_NUMS];
  std::unordered_map<PAGE_ID, Span*> _id_span_map;
  ObjectPool<Span> _span_pool;
};

其中Page Cache的属性有:

Page Cache提供的方法有:

NewSpan实现

NewSpan的主要功能是向Central Cache的GetOneSpan函数获取span对象的。

当进程申请大于128页的内存时,就不会经过Thread Cache 和 Central Cache,直接向PageCache申请即可。而PageCache在申请时,直接从堆区申请即可。

从堆区申请按页为单位进行申请,申请到大块内存后,交给span对象进行管理。

当小于128页时,就会从对应的桶下进行查找,如果有就返回,没有就去下一个桶进行查找,,直到遍历完整个哈希表后都没有,就去堆区申请。

当去下一个桶查找时发现有的时候,就进行切分。比如申请5页的,遍历到10页后发现有未使用的span对象。就把这个10页的span进行切分,切分成两个5页的,一个返回给Central Cache,一个挂到5页的桶下面管理起来即可。

//获取一个size页的span
Span* PageCache::NewSpan(size_t size)
{
  if (size > PAGE_NUMS -1)//大于128页 
  {
    void* ptr = SystemAlloc(size);
    //申请的内存交给span管理
    Span* span = _span_pool.New();
    //根据地址确定页号
    span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
    span->_page_num = size;
    _id_span_map[span->_page_id] = span;
    _id_span_map[span->_page_id + span->_page_num - 1] = span;
    return span;
  }

  //SpanList中有就返回 没有就从堆中申请
  if (!_span_lsit[size].Empty())
  {
    //建立id和span的映射关系
    Span* kspan = _span_lsit[size].PopFront();
    for (PAGE_ID i = 0; i < kspan->_page_num; ++i)
    {
      _id_span_map[kspan->_page_id + i] = kspan;
    }
    return kspan;
  }

  //检查下一个桶有没有
  for (int i = size + 1; i < PAGE_NUMS; ++i)
  {
    if (!_span_lsit[i].Empty())
    {
      //切分
      Span* nspan = _span_lsit[i].PopFront();
      Span* kspan = _span_pool.New();

      kspan->_page_id = nspan->_page_id;
      kspan->_page_num = size;

      nspan->_page_id += size;
      nspan->_page_num -= size;
      //将切分剩余的页头插到对应的桶下面
      _span_lsit[nspan->_page_num].PushFront(nspan);
      //建立nspan首尾页号的映射关系 方便page cache回收内存时进行合并查找
      _id_span_map[nspan->_page_id] = nspan;
      _id_span_map[nspan->_page_id + nspan->_page_num - 1] = nspan;
      //建立id和span的映射关系
      for (PAGE_ID i = 0; i < kspan->_page_num; ++i)
      {
        _id_span_map[kspan->_page_id + i] = kspan;
      }
      return kspan;
    }
  }

  //从堆区申请
  Span* span = _span_pool.New();
  void* ptr = SystemAlloc(PAGE_NUMS - 1);
  //根据地址确定页号  地址除8*1024即可(一页8kb)
  span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
  span->_page_num = PAGE_NUMS - 1;
  //插入到128号桶下
  _span_lsit[span->_page_num].PushFront(span);
  //切分 递归调用自己进行切分即可,不用再写切分逻辑了
  return NewSpan(size);
}

注意:

Page Cache 申请内存

获取页号时只能根据地址来确定页号。PageCache申请内存时都以页为单位从堆区获取。堆区申请时返回的是页的起始地址,这就涉及到如何从地址转换到页。

假设申请到的地址是0x00120000,他对应的页号就是0x00120000 除以8Kb(除8Kb直接右移13位即可)对应的是144页。

144页存放的地址就是0x00120000到0x00122000。这8kb的地址。(2000是16进制,转10进制就是8192)。

申请到的内存要与span建立映射关系,便于删除时进行合并操作。

当整个PageCache 没有对应的内存大小时,从堆区进行申请后,要进行切分操作。比如第一次申请8页大小时,整个PageCache是没有内存的,直接从堆区申请128页的内存,这时就需要切分。将128页的内存切分为一个8页的返回给Central Cache,剩余的120页挂到120号桶下即可。切分的时候直接递归调用自己即可,不用再写切分逻辑。

MapObjectToSpan实现:

MapObjectToSpan函数就是获取页号与管理该页span对象的映射关系。方便后续进行合并查找。

获取页号时,同样也需要通过地址确定页号。确定完页号后,在unordered_map中查找该span对象即可。

Span* PageCache::MapObjectToSpan(void* obj)
{
  std::unique_lock<std::mutex> lock(_page_mtx);
  //根据地址获取页号
  PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

  auto ret = _id_span_map.find(id);
  if (ret != _id_span_map.end())
  {
    return ret->second;
  }
  else
  {
    return nullptr;
  }  
}

ReleaseSpanToPageCache

释放空间,并且进行合并相邻的span对象。

释放空间的时候,根据释放对象大小确定释放规则。

Central Cache把内存块还给PageCache的时候,要把小块内存合并成大块内存,解决内存碎片问题。

图片

void PageCache::ReleaseSpanToPageCache(Span* span)
{
  size_t size = span->_page_num;
  if (size > PAGE_NUMS - 1)
  {
    //根据页号计算地址
    void* ptr = (void*)(span->_page_id << PAGE_SHIFT);
    SystemFree(ptr);
    _span_pool.Delete(span);
    return;
  }

  //向前合并
  while (1)
  {
    PAGE_ID prev_span_page_id = span->_page_id - 1;
    auto ret = _id_span_map.find(prev_span_page_id);
    if (ret == _id_span_map.end())
    {
      break;
    }
    Span* prev_span = ret->second;
    if (prev_span->_is_use)
    {
      break;
    }
    if (prev_span->_page_num + span->_page_num > PAGE_NUMS - 1)
    {
      break;
    }
    //合并
    span->_page_num += prev_span->_page_num;
    span->_page_id = prev_span->_page_id;
    _span_lsit[prev_span->_page_num].Erase(prev_span);
    _span_pool.Delete(prev_span);
  }

  //向后合并
  while (1)
  {
    PAGE_ID next_span_page_id = span->_page_id + span->_page_num;
    auto ret = _id_span_map.find(next_span_page_id);
    if (ret == _id_span_map.end())
    {
      break;
    }
    Span* next_span = ret->second;
    if (next_span->_is_use)
    {
      break;
    }
    if (span->_page_num + next_span->_page_num > PAGE_NUMS - 1)
    {
      break;
    }

    //合并
    span->_page_num += next_span->_page_num;
    _span_lsit[next_span->_page_num].Erase(next_span);
    _span_pool.Delete(span);
  }

  //将合并好的插入到对应的页号下
  _span_lsit[span->_page_num].PushFront(span);
  span->_is_use = false;
  //更新页的首尾映射关系
  _id_span_map[span->_page_id] = span;
  _id_span_map[span->_page_id + span->_page_num - 1] = span;
}

Page Cache释放内存

释放内存到堆区时,是以页为单位进行释放的,需要根据页号确定地址,比如144页,144页左移13位(*8Kb)就能确定144页的起始地址了。

性能测试

内存池对比malloc函数进行对比。同时启动相同数量的线程,执行一样的轮次进行申请释放。比如同时启动四个线程,并发执行10000轮次,每次执行申请释放内存50次。

#include"ConcurrentAlloc.h"
#include <atomic>

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
  std::vector<std::thread> vthread(nworks);
  std::atomic<size_t> malloc_costtime = 0;
  std::atomic<size_t> free_costtime = 0;

  for (size_t k = 0; k < nworks; ++k)
  {
    vthread[k] = std::thread([&, k]() {
      std::vector<void*> v;
      v.reserve(ntimes);

      for (size_t j = 0; j < rounds; ++j)
      {
        size_t begin1 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          v.push_back(malloc(16));
          //v.push_back(malloc((16 + i) % 8192 + 1));
        }
        size_t end1 = clock();

        size_t begin2 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          free(v[i]);
        }
        size_t end2 = clock();
        v.clear();

        malloc_costtime += (end1 - begin1);
        free_costtime += (end2 - begin2);
      }
      });
  }

  for (auto& t : vthread)
  {
    t.join();
  }

  printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%zu ms\n",
    nworks, rounds, ntimes, malloc_costtime.load());

  printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
    nworks, rounds, ntimes, free_costtime.load());

  printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
    nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
  std::vector<std::thread> vthread(nworks);
  std::atomic<size_t> malloc_costtime = 0;
  std::atomic<size_t> free_costtime = 0;

  for (size_t k = 0; k < nworks; ++k)
  {
    vthread[k] = std::thread([&]() {
      std::vector<void*> v;
      v.reserve(ntimes);

      for (size_t j = 0; j < rounds; ++j)
      {
        size_t begin1 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          v.push_back(ConcurrentAlloc(16));
          //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
        }
        size_t end1 = clock();

        size_t begin2 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          ConcurrentFree(v[i]);
        }
        size_t end2 = clock();
        v.clear();

        malloc_costtime += (end1 - begin1);
        free_costtime += (end2 - begin2);
      }
      });
  }

  for (auto& t : vthread)
  {
    t.join();
  }

  printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
    nworks, rounds, ntimes, malloc_costtime.load());

  printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
    nworks, rounds, ntimes, free_costtime.load());

  printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
    nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

int main()
{
  size_t n = 50;
  cout << "==========================================================" << endl;
  BenchmarkConcurrentMalloc(n, 4, 10000);
  cout << endl << endl;

  BenchmarkMalloc(n, 4, 10000);
  cout << "==========================================================" << endl;

  return 0;
}

运行结果:

图片

嗯?内存池的效率还不如malloc。用性能检测工具检测一下,看哪里的性能太差。这里使用vs自带的性能检测工具。

图片

图片

选择这个选项,可以查看每个函数调用的时间,就可以进一步定位问题了。

通过这个检测工具发现,释放的时候获取页号和span对象的这个函数占用时间最长。

图片

进入MapObjectToSpan函数后,发现,消耗最多的是加锁问题。因为这里使用了STL中的unordered_map进行映射的。而STL本身是线程不安全的,必须要加锁访问。(unordered_map底层是哈希表,会发生动态扩容,扩容的时候必须要加锁,而map底层是红黑树,红黑树涉及到旋转问题,也必须要加锁)

图片

解决这个问题,参考tcmalloc使用基数树进行优化。基数树本质解决的是扩容问题,让哈希表底层是固定的,不会发生动态扩容。这样,再访问时就不用担心其他线程插入数据发发生扩容,而获取到"脏数据"的问题。

使用基数树优化

基数树可以做到获取映射时,无锁访问。基数树实际采用的就是直接定址法,每一个页号对应span对象的地址就存储数组中在以该页号为下标的位置。

图片

每一个页都需要映射一个sapn对象,这样在32位下,一页8kb计算,一共有2^19个页。一个地址4个字节,也就是存储他们的映射关系就需要2^19*4字节。即2Mb的空间。消耗的资源还可以。

template<int BITS>
class TCMalloc_PageMap1
{
public:
  typedef uintptr_t Number;

  explicit TCMalloc_PageMap1()
  {
    size_t size = sizeof(void*) << BITS;//需要开辟数组的大小
    _array = (void**)SystemAlloc(size >> PAGE_SHIFT);
    memset(_array, 0, size);
  }

  void* get(Number k) const
  {
    if ((k >> BITS) > 0)
    {
      return nullptr;
    }
    return _array[k];
  }

  void set(Number k,void* v)
  {
    assert(k >> BITS == 0);
    _array[k] = v;//建立映射
  }

private:
  void** _array;
  static const int LENGTH = 1 << BITS;
};

基数树采用非类型模板参数,对于32位系统,PageCache创建哈希表对象时,直接传32-13即可。就是19。就是页个数的次方。(也可以直接固定死)TCMalloc_PageMap1<32 - PAGE_SHIFT> _id_span_map;

使用基数树替换unordered_map。建立映射时,调用set即可,获取映射关系时调用get即可。

比如建立span下的页号和span的映射关系:

_id_span_map.set(span->_page_id,span);

获取页号和span的关系:

Span* ret = (Span*)_id_span_map.get(span->_page_id);

替换完基数树后的测试结果:比使用unordered_map快了9倍,比malloc快了3倍。(DEBUG版本下)。

图片

测性能DEBUG并不能很好的测试,使用RELEASE版本进行测试:

图片

图片

对于32位机器来说,使用基数树是完全没问题的,但是64位机器下,共有2^51页,开空间就要2^51 * 8字节。也就是16,384TB,很显然,不可能开这么多。这个问题tcmalloc解决方式是,采用多层基数树。上面的基数树称之为单层基数树。还有二层基数树。二层基数树就是进行两次哈希映射。

图片

比如在32位下,存储页号需要2^19字节。而二层基数树实际上就是把这些字节分为两次进行映射。

将单层基数一次开得2mb空间,划分成多个区域,比如以4096页为单位划分,每4096页需要一个二层哈希表进行映射,第一层需要2^19 / 2^12 = 128字节即可。第二层,当开到对应得页号时在申请空间进行映射(一般不可能申请2^19页这么多)。这样就能减少空间开辟了。

如果在64位机器下,二层基数树,依然无法解决问题。采用三层基数树,进一步划分即可。三层基数树就是进一步划分区域,分位三层哈希。