| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- # copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # utils for memory management which is allocated on sharedmemory,
- # note that these structures may not be thread-safe
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- from __future__ import unicode_literals
- import os
- import time
- import math
- import struct
- import sys
- import six
- if six.PY3:
- import pickle
- else:
- import cPickle as pickle
- import json
- import uuid
- import random
- import numpy as np
- import weakref
- import logging
- from multiprocessing import Lock
- from multiprocessing import RawArray
- logger = logging.getLogger(__name__)
- class SharedMemoryError(ValueError):
- """ SharedMemoryError
- """
- pass
- class SharedBufferError(SharedMemoryError):
- """ SharedBufferError
- """
- pass
- class MemoryFullError(SharedMemoryError):
- """ MemoryFullError
- """
- def __init__(self, errmsg=''):
- super(MemoryFullError, self).__init__()
- self.errmsg = errmsg
- def memcopy(dst, src, offset=0, length=None):
- """ copy data from 'src' to 'dst' in bytes
- """
- length = length if length is not None else len(src)
- assert type(dst) == np.ndarray, 'invalid type for "dst" in memcopy'
- if type(src) is not np.ndarray:
- if type(src) is str and six.PY3:
- src = src.encode()
- src = np.frombuffer(src, dtype='uint8', count=len(src))
- dst[:] = src[offset:offset + length]
- class SharedBuffer(object):
- """ Buffer allocated from SharedMemoryMgr, and it stores data on shared memory
- note that:
- every instance of this should be freed explicitely by calling 'self.free'
- """
- def __init__(self, owner, capacity, pos, size=0, alloc_status=''):
- """ Init
- Args:
- owner (str): manager to own this buffer
- capacity (int): capacity in bytes for this buffer
- pos (int): page position in shared memory
- size (int): bytes already used
- alloc_status (str): debug info about allocator when allocate this
- """
- self._owner = owner
- self._cap = capacity
- self._pos = pos
- self._size = size
- self._alloc_status = alloc_status
- assert self._pos >= 0 and self._cap > 0, \
- "invalid params[%d:%d] to construct SharedBuffer" \
- % (self._pos, self._cap)
- def owner(self):
- """ get owner
- """
- return SharedMemoryMgr.get_mgr(self._owner)
- def put(self, data, override=False):
- """ put data to this buffer
- Args:
- data (str): data to be stored in this buffer
- Returns:
- None
- Raises:
- SharedMemoryError when not enough space in this buffer
- """
- assert type(data) in [str, bytes], \
- 'invalid type[%s] for SharedBuffer::put' % (str(type(data)))
- if self._size > 0 and not override:
- raise SharedBufferError('already has already been setted before')
- if self.capacity() < len(data):
- raise SharedBufferError('data[%d] is larger than size of buffer[%s]'\
- % (len(data), str(self)))
- self.owner().put_data(self, data)
- self._size = len(data)
- def get(self, offset=0, size=None, no_copy=True):
- """ get the data stored this buffer
- Args:
- offset (int): position for the start point to 'get'
- size (int): size to get
- Returns:
- data (np.ndarray('uint8')): user's data in numpy
- which is passed in by 'put'
- None: if no data stored in
- """
- offset = offset if offset >= 0 else self._size + offset
- if self._size <= 0:
- return None
- size = self._size if size is None else size
- assert offset + size <= self._cap, 'invalid offset[%d] '\
- 'or size[%d] for capacity[%d]' % (offset, size, self._cap)
- return self.owner().get_data(self, offset, size, no_copy=no_copy)
- def size(self):
- """ bytes of used memory
- """
- return self._size
- def resize(self, size):
- """ resize the used memory to 'size', should not be greater than capacity
- """
- assert size >= 0 and size <= self._cap, \
- "invalid size[%d] for resize" % (size)
- self._size = size
- def capacity(self):
- """ size of allocated memory
- """
- return self._cap
- def __str__(self):
- """ human readable format
- """
- return "SharedBuffer(owner:%s, pos:%d, size:%d, "\
- "capacity:%d, alloc_status:[%s], pid:%d)" \
- % (str(self._owner), self._pos, self._size, \
- self._cap, self._alloc_status, os.getpid())
- def free(self):
- """ free this buffer to it's owner
- """
- if self._owner is not None:
- self.owner().free(self)
- self._owner = None
- self._cap = 0
- self._pos = -1
- self._size = 0
- return True
- else:
- return False
- class PageAllocator(object):
- """ allocator used to malloc and free shared memory which
- is split into pages
- """
- s_allocator_header = 12
- def __init__(self, base, total_pages, page_size):
- """ init
- """
- self._magic_num = 1234321000 + random.randint(100, 999)
- self._base = base
- self._total_pages = total_pages
- self._page_size = page_size
- header_pages = int(
- math.ceil((total_pages + self.s_allocator_header) / page_size))
- self._header_pages = header_pages
- self._free_pages = total_pages - header_pages
- self._header_size = self._header_pages * page_size
- self._reset()
- def _dump_alloc_info(self, fname):
- hpages, tpages, pos, used = self.header()
- start = self.s_allocator_header
- end = start + self._page_size * hpages
- alloc_flags = self._base[start:end].tostring()
- info = {
- 'magic_num': self._magic_num,
- 'header_pages': hpages,
- 'total_pages': tpages,
- 'pos': pos,
- 'used': used
- }
- info['alloc_flags'] = alloc_flags
- fname = fname + '.' + str(uuid.uuid4())[:6]
- with open(fname, 'wb') as f:
- f.write(pickle.dumps(info, -1))
- logger.warn('dump alloc info to file[%s]' % (fname))
- def _reset(self):
- alloc_page_pos = self._header_pages
- used_pages = self._header_pages
- header_info = struct.pack(
- str('III'), self._magic_num, alloc_page_pos, used_pages)
- assert len(header_info) == self.s_allocator_header, \
- 'invalid size of header_info'
- memcopy(self._base[0:self.s_allocator_header], header_info)
- self.set_page_status(0, self._header_pages, '1')
- self.set_page_status(self._header_pages, self._free_pages, '0')
- def header(self):
- """ get header info of this allocator
- """
- header_str = self._base[0:self.s_allocator_header].tostring()
- magic, pos, used = struct.unpack(str('III'), header_str)
- assert magic == self._magic_num, \
- 'invalid header magic[%d] in shared memory' % (magic)
- return self._header_pages, self._total_pages, pos, used
- def empty(self):
- """ are all allocatable pages available
- """
- header_pages, pages, pos, used = self.header()
- return header_pages == used
- def full(self):
- """ are all allocatable pages used
- """
- header_pages, pages, pos, used = self.header()
- return header_pages + used == pages
- def __str__(self):
- header_pages, pages, pos, used = self.header()
- desc = '{page_info[magic:%d,total:%d,used:%d,header:%d,alloc_pos:%d,pagesize:%d]}' \
- % (self._magic_num, pages, used, header_pages, pos, self._page_size)
- return 'PageAllocator:%s' % (desc)
- def set_alloc_info(self, alloc_pos, used_pages):
- """ set allocating position to new value
- """
- memcopy(self._base[4:12],
- struct.pack(str('II'), alloc_pos, used_pages))
- def set_page_status(self, start, page_num, status):
- """ set pages from 'start' to 'end' with new same status 'status'
- """
- assert status in ['0', '1'], 'invalid status[%s] for page status '\
- 'in allocator[%s]' % (status, str(self))
- start += self.s_allocator_header
- end = start + page_num
- assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
- 'in allocator[%s]' % (end, str(self))
- memcopy(self._base[start:end], str(status * page_num))
- def get_page_status(self, start, page_num, ret_flag=False):
- start += self.s_allocator_header
- end = start + page_num
- assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
- 'in allocator[%s]' % (end, str(self))
- status = self._base[start:end].tostring().decode()
- if ret_flag:
- return status
- zero_num = status.count('0')
- if zero_num == 0:
- return (page_num, 1)
- else:
- return (zero_num, 0)
- def malloc_page(self, page_num):
- header_pages, pages, pos, used = self.header()
- end = pos + page_num
- if end > pages:
- pos = self._header_pages
- end = pos + page_num
- start_pos = pos
- flags = ''
- while True:
- # maybe flags already has some '0' pages,
- # so just check 'page_num - len(flags)' pages
- flags = self.get_page_status(pos, page_num, ret_flag=True)
- if flags.count('0') == page_num:
- break
- # not found enough pages, so shift to next few pages
- free_pos = flags.rfind('1') + 1
- pos += free_pos
- end = pos + page_num
- if end > pages:
- pos = self._header_pages
- end = pos + page_num
- flags = ''
- # not found available pages after scan all pages
- if pos <= start_pos and end >= start_pos:
- logger.debug('not found available pages after scan all pages')
- break
- page_status = (flags.count('0'), 0)
- if page_status != (page_num, 0):
- free_pages = self._total_pages - used
- if free_pages == 0:
- err_msg = 'all pages have been used:%s' % (str(self))
- else:
- err_msg = 'not found available pages with page_status[%s] '\
- 'and %d free pages' % (str(page_status), free_pages)
- err_msg = 'failed to malloc %d pages at pos[%d] for reason[%s] and allocator status[%s]' \
- % (page_num, pos, err_msg, str(self))
- raise MemoryFullError(err_msg)
- self.set_page_status(pos, page_num, '1')
- used += page_num
- self.set_alloc_info(end, used)
- return pos
- def free_page(self, start, page_num):
- """ free 'page_num' pages start from 'start'
- """
- page_status = self.get_page_status(start, page_num)
- assert page_status == (page_num, 1), \
- 'invalid status[%s] when free [%d, %d]' \
- % (str(page_status), start, page_num)
- self.set_page_status(start, page_num, '0')
- _, _, pos, used = self.header()
- used -= page_num
- self.set_alloc_info(pos, used)
- DEFAULT_SHARED_MEMORY_SIZE = 1024 * 1024 * 1024
- class SharedMemoryMgr(object):
- """ manage a continouse block of memory, provide
- 'malloc' to allocate new buffer, and 'free' to free buffer
- """
- s_memory_mgrs = weakref.WeakValueDictionary()
- s_mgr_num = 0
- s_log_statis = False
- @classmethod
- def get_mgr(cls, id):
- """ get a SharedMemoryMgr with size of 'capacity'
- """
- assert id in cls.s_memory_mgrs, 'invalid id[%s] for memory managers' % (
- id)
- return cls.s_memory_mgrs[id]
- def __init__(self, capacity=None, pagesize=None):
- """ init
- """
- logger.debug('create SharedMemoryMgr')
- pagesize = 64 * 1024 if pagesize is None else pagesize
- assert type(pagesize) is int, "invalid type of pagesize[%s]" \
- % (str(pagesize))
- capacity = DEFAULT_SHARED_MEMORY_SIZE if capacity is None else capacity
- assert type(capacity) is int, "invalid type of capacity[%s]" \
- % (str(capacity))
- assert capacity > 0, '"size of shared memory should be greater than 0'
- self._released = False
- self._cap = capacity
- self._page_size = pagesize
- assert self._cap % self._page_size == 0, \
- "capacity[%d] and pagesize[%d] are not consistent" \
- % (self._cap, self._page_size)
- self._total_pages = self._cap // self._page_size
- self._pid = os.getpid()
- SharedMemoryMgr.s_mgr_num += 1
- self._id = self._pid * 100 + SharedMemoryMgr.s_mgr_num
- SharedMemoryMgr.s_memory_mgrs[self._id] = self
- self._locker = Lock()
- self._setup()
- def _setup(self):
- self._shared_mem = RawArray('c', self._cap)
- self._base = np.frombuffer(
- self._shared_mem, dtype='uint8', count=self._cap)
- self._locker.acquire()
- try:
- self._allocator = PageAllocator(self._base, self._total_pages,
- self._page_size)
- finally:
- self._locker.release()
- def malloc(self, size, wait=True):
- """ malloc a new SharedBuffer
- Args:
- size (int): buffer size to be malloc
- wait (bool): whether to wait when no enough memory
- Returns:
- SharedBuffer
- Raises:
- SharedMemoryError when not found available memory
- """
- page_num = int(math.ceil(size / self._page_size))
- size = page_num * self._page_size
- start = None
- ct = 0
- errmsg = ''
- while True:
- self._locker.acquire()
- try:
- start = self._allocator.malloc_page(page_num)
- alloc_status = str(self._allocator)
- except MemoryFullError as e:
- start = None
- errmsg = e.errmsg
- if not wait:
- raise e
- finally:
- self._locker.release()
- if start is None:
- time.sleep(0.1)
- if ct % 100 == 0:
- logger.warn('not enough space for reason[%s]' % (errmsg))
- ct += 1
- else:
- break
- return SharedBuffer(self._id, size, start, alloc_status=alloc_status)
- def free(self, shared_buf):
- """ free a SharedBuffer
- Args:
- shared_buf (SharedBuffer): buffer to be freed
- Returns:
- None
- Raises:
- SharedMemoryError when failed to release this buffer
- """
- assert shared_buf._owner == self._id, "invalid shared_buf[%s] "\
- "for it's not allocated from me[%s]" % (str(shared_buf), str(self))
- cap = shared_buf.capacity()
- start_page = shared_buf._pos
- page_num = cap // self._page_size
- #maybe we don't need this lock here
- self._locker.acquire()
- try:
- self._allocator.free_page(start_page, page_num)
- finally:
- self._locker.release()
- def put_data(self, shared_buf, data):
- """ fill 'data' into 'shared_buf'
- """
- assert len(data) <= shared_buf.capacity(), 'too large data[%d] '\
- 'for this buffer[%s]' % (len(data), str(shared_buf))
- start = shared_buf._pos * self._page_size
- end = start + len(data)
- assert start >= 0 and end <= self._cap, "invalid start "\
- "position[%d] when put data to buff:%s" % (start, str(shared_buf))
- self._base[start:end] = np.frombuffer(data, 'uint8', len(data))
- def get_data(self, shared_buf, offset, size, no_copy=True):
- """ extract 'data' from 'shared_buf' in range [offset, offset + size)
- """
- start = shared_buf._pos * self._page_size
- start += offset
- if no_copy:
- return self._base[start:start + size]
- else:
- return self._base[start:start + size].tostring()
- def __str__(self):
- return 'SharedMemoryMgr:{id:%d, %s}' % (self._id, str(self._allocator))
- def __del__(self):
- if SharedMemoryMgr.s_log_statis:
- logger.info('destroy [%s]' % (self))
- if not self._released and not self._allocator.empty():
- logger.debug('not empty when delete this SharedMemoryMgr[%s]' %
- (self))
- else:
- self._released = True
- if self._id in SharedMemoryMgr.s_memory_mgrs:
- del SharedMemoryMgr.s_memory_mgrs[self._id]
- SharedMemoryMgr.s_mgr_num -= 1
|