sharedmemory.py 17 KB


  1. # copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # utils for memory management which is allocated on sharedmemory,
  15. # note that these structures may not be thread-safe
  16. from __future__ import absolute_import
  17. from __future__ import division
  18. from __future__ import print_function
  19. from __future__ import unicode_literals
  20. import os
  21. import time
  22. import math
  23. import struct
  24. import sys
  25. import six
  26. if six.PY3:
  27. import pickle
  28. else:
  29. import cPickle as pickle
  30. import json
  31. import uuid
  32. import random
  33. import numpy as np
  34. import weakref
  35. import logging
  36. from multiprocessing import Lock
  37. from multiprocessing import RawArray
  38. logger = logging.getLogger(__name__)
  39. class SharedMemoryError(ValueError):
  40. """ SharedMemoryError
  41. """
  42. pass
  43. class SharedBufferError(SharedMemoryError):
  44. """ SharedBufferError
  45. """
  46. pass
  47. class MemoryFullError(SharedMemoryError):
  48. """ MemoryFullError
  49. """
  50. def __init__(self, errmsg=''):
  51. super(MemoryFullError, self).__init__()
  52. self.errmsg = errmsg
  53. def memcopy(dst, src, offset=0, length=None):
  54. """ copy data from 'src' to 'dst' in bytes
  55. """
  56. length = length if length is not None else len(src)
  57. assert type(dst) == np.ndarray, 'invalid type for "dst" in memcopy'
  58. if type(src) is not np.ndarray:
  59. if type(src) is str and six.PY3:
  60. src = src.encode()
  61. src = np.frombuffer(src, dtype='uint8', count=len(src))
  62. dst[:] = src[offset:offset + length]
  63. class SharedBuffer(object):
  64. """ Buffer allocated from SharedMemoryMgr, and it stores data on shared memory
  65. note that:
  66. every instance of this should be freed explicitely by calling 'self.free'
  67. """
  68. def __init__(self, owner, capacity, pos, size=0, alloc_status=''):
  69. """ Init
  70. Args:
  71. owner (str): manager to own this buffer
  72. capacity (int): capacity in bytes for this buffer
  73. pos (int): page position in shared memory
  74. size (int): bytes already used
  75. alloc_status (str): debug info about allocator when allocate this
  76. """
  77. self._owner = owner
  78. self._cap = capacity
  79. self._pos = pos
  80. self._size = size
  81. self._alloc_status = alloc_status
  82. assert self._pos >= 0 and self._cap > 0, \
  83. "invalid params[%d:%d] to construct SharedBuffer" \
  84. % (self._pos, self._cap)
  85. def owner(self):
  86. """ get owner
  87. """
  88. return SharedMemoryMgr.get_mgr(self._owner)
  89. def put(self, data, override=False):
  90. """ put data to this buffer
  91. Args:
  92. data (str): data to be stored in this buffer
  93. Returns:
  94. None
  95. Raises:
  96. SharedMemoryError when not enough space in this buffer
  97. """
  98. assert type(data) in [str, bytes], \
  99. 'invalid type[%s] for SharedBuffer::put' % (str(type(data)))
  100. if self._size > 0 and not override:
  101. raise SharedBufferError('already has already been setted before')
  102. if self.capacity() < len(data):
  103. raise SharedBufferError('data[%d] is larger than size of buffer[%s]'\
  104. % (len(data), str(self)))
  105. self.owner().put_data(self, data)
  106. self._size = len(data)
  107. def get(self, offset=0, size=None, no_copy=True):
  108. """ get the data stored this buffer
  109. Args:
  110. offset (int): position for the start point to 'get'
  111. size (int): size to get
  112. Returns:
  113. data (np.ndarray('uint8')): user's data in numpy
  114. which is passed in by 'put'
  115. None: if no data stored in
  116. """
  117. offset = offset if offset >= 0 else self._size + offset
  118. if self._size <= 0:
  119. return None
  120. size = self._size if size is None else size
  121. assert offset + size <= self._cap, 'invalid offset[%d] '\
  122. 'or size[%d] for capacity[%d]' % (offset, size, self._cap)
  123. return self.owner().get_data(self, offset, size, no_copy=no_copy)
  124. def size(self):
  125. """ bytes of used memory
  126. """
  127. return self._size
  128. def resize(self, size):
  129. """ resize the used memory to 'size', should not be greater than capacity
  130. """
  131. assert size >= 0 and size <= self._cap, \
  132. "invalid size[%d] for resize" % (size)
  133. self._size = size
  134. def capacity(self):
  135. """ size of allocated memory
  136. """
  137. return self._cap
  138. def __str__(self):
  139. """ human readable format
  140. """
  141. return "SharedBuffer(owner:%s, pos:%d, size:%d, "\
  142. "capacity:%d, alloc_status:[%s], pid:%d)" \
  143. % (str(self._owner), self._pos, self._size, \
  144. self._cap, self._alloc_status, os.getpid())
  145. def free(self):
  146. """ free this buffer to it's owner
  147. """
  148. if self._owner is not None:
  149. self.owner().free(self)
  150. self._owner = None
  151. self._cap = 0
  152. self._pos = -1
  153. self._size = 0
  154. return True
  155. else:
  156. return False
  157. class PageAllocator(object):
  158. """ allocator used to malloc and free shared memory which
  159. is split into pages
  160. """
  161. s_allocator_header = 12
  162. def __init__(self, base, total_pages, page_size):
  163. """ init
  164. """
  165. self._magic_num = 1234321000 + random.randint(100, 999)
  166. self._base = base
  167. self._total_pages = total_pages
  168. self._page_size = page_size
  169. header_pages = int(
  170. math.ceil((total_pages + self.s_allocator_header) / page_size))
  171. self._header_pages = header_pages
  172. self._free_pages = total_pages - header_pages
  173. self._header_size = self._header_pages * page_size
  174. self._reset()
  175. def _dump_alloc_info(self, fname):
  176. hpages, tpages, pos, used = self.header()
  177. start = self.s_allocator_header
  178. end = start + self._page_size * hpages
  179. alloc_flags = self._base[start:end].tostring()
  180. info = {
  181. 'magic_num': self._magic_num,
  182. 'header_pages': hpages,
  183. 'total_pages': tpages,
  184. 'pos': pos,
  185. 'used': used
  186. }
  187. info['alloc_flags'] = alloc_flags
  188. fname = fname + '.' + str(uuid.uuid4())[:6]
  189. with open(fname, 'wb') as f:
  190. f.write(pickle.dumps(info, -1))
  191. logger.warn('dump alloc info to file[%s]' % (fname))
  192. def _reset(self):
  193. alloc_page_pos = self._header_pages
  194. used_pages = self._header_pages
  195. header_info = struct.pack(
  196. str('III'), self._magic_num, alloc_page_pos, used_pages)
  197. assert len(header_info) == self.s_allocator_header, \
  198. 'invalid size of header_info'
  199. memcopy(self._base[0:self.s_allocator_header], header_info)
  200. self.set_page_status(0, self._header_pages, '1')
  201. self.set_page_status(self._header_pages, self._free_pages, '0')
  202. def header(self):
  203. """ get header info of this allocator
  204. """
  205. header_str = self._base[0:self.s_allocator_header].tostring()
  206. magic, pos, used = struct.unpack(str('III'), header_str)
  207. assert magic == self._magic_num, \
  208. 'invalid header magic[%d] in shared memory' % (magic)
  209. return self._header_pages, self._total_pages, pos, used
  210. def empty(self):
  211. """ are all allocatable pages available
  212. """
  213. header_pages, pages, pos, used = self.header()
  214. return header_pages == used
  215. def full(self):
  216. """ are all allocatable pages used
  217. """
  218. header_pages, pages, pos, used = self.header()
  219. return header_pages + used == pages
  220. def __str__(self):
  221. header_pages, pages, pos, used = self.header()
  222. desc = '{page_info[magic:%d,total:%d,used:%d,header:%d,alloc_pos:%d,pagesize:%d]}' \
  223. % (self._magic_num, pages, used, header_pages, pos, self._page_size)
  224. return 'PageAllocator:%s' % (desc)
  225. def set_alloc_info(self, alloc_pos, used_pages):
  226. """ set allocating position to new value
  227. """
  228. memcopy(self._base[4:12],
  229. struct.pack(str('II'), alloc_pos, used_pages))
  230. def set_page_status(self, start, page_num, status):
  231. """ set pages from 'start' to 'end' with new same status 'status'
  232. """
  233. assert status in ['0', '1'], 'invalid status[%s] for page status '\
  234. 'in allocator[%s]' % (status, str(self))
  235. start += self.s_allocator_header
  236. end = start + page_num
  237. assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
  238. 'in allocator[%s]' % (end, str(self))
  239. memcopy(self._base[start:end], str(status * page_num))
  240. def get_page_status(self, start, page_num, ret_flag=False):
  241. start += self.s_allocator_header
  242. end = start + page_num
  243. assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
  244. 'in allocator[%s]' % (end, str(self))
  245. status = self._base[start:end].tostring().decode()
  246. if ret_flag:
  247. return status
  248. zero_num = status.count('0')
  249. if zero_num == 0:
  250. return (page_num, 1)
  251. else:
  252. return (zero_num, 0)
  253. def malloc_page(self, page_num):
  254. header_pages, pages, pos, used = self.header()
  255. end = pos + page_num
  256. if end > pages:
  257. pos = self._header_pages
  258. end = pos + page_num
  259. start_pos = pos
  260. flags = ''
  261. while True:
  262. # maybe flags already has some '0' pages,
  263. # so just check 'page_num - len(flags)' pages
  264. flags = self.get_page_status(pos, page_num, ret_flag=True)
  265. if flags.count('0') == page_num:
  266. break
  267. # not found enough pages, so shift to next few pages
  268. free_pos = flags.rfind('1') + 1
  269. pos += free_pos
  270. end = pos + page_num
  271. if end > pages:
  272. pos = self._header_pages
  273. end = pos + page_num
  274. flags = ''
  275. # not found available pages after scan all pages
  276. if pos <= start_pos and end >= start_pos:
  277. logger.debug('not found available pages after scan all pages')
  278. break
  279. page_status = (flags.count('0'), 0)
  280. if page_status != (page_num, 0):
  281. free_pages = self._total_pages - used
  282. if free_pages == 0:
  283. err_msg = 'all pages have been used:%s' % (str(self))
  284. else:
  285. err_msg = 'not found available pages with page_status[%s] '\
  286. 'and %d free pages' % (str(page_status), free_pages)
  287. err_msg = 'failed to malloc %d pages at pos[%d] for reason[%s] and allocator status[%s]' \
  288. % (page_num, pos, err_msg, str(self))
  289. raise MemoryFullError(err_msg)
  290. self.set_page_status(pos, page_num, '1')
  291. used += page_num
  292. self.set_alloc_info(end, used)
  293. return pos
  294. def free_page(self, start, page_num):
  295. """ free 'page_num' pages start from 'start'
  296. """
  297. page_status = self.get_page_status(start, page_num)
  298. assert page_status == (page_num, 1), \
  299. 'invalid status[%s] when free [%d, %d]' \
  300. % (str(page_status), start, page_num)
  301. self.set_page_status(start, page_num, '0')
  302. _, _, pos, used = self.header()
  303. used -= page_num
  304. self.set_alloc_info(pos, used)
  305. DEFAULT_SHARED_MEMORY_SIZE = 1024 * 1024 * 1024
  306. class SharedMemoryMgr(object):
  307. """ manage a continouse block of memory, provide
  308. 'malloc' to allocate new buffer, and 'free' to free buffer
  309. """
  310. s_memory_mgrs = weakref.WeakValueDictionary()
  311. s_mgr_num = 0
  312. s_log_statis = False
  313. @classmethod
  314. def get_mgr(cls, id):
  315. """ get a SharedMemoryMgr with size of 'capacity'
  316. """
  317. assert id in cls.s_memory_mgrs, 'invalid id[%s] for memory managers' % (
  318. id)
  319. return cls.s_memory_mgrs[id]
  320. def __init__(self, capacity=None, pagesize=None):
  321. """ init
  322. """
  323. logger.debug('create SharedMemoryMgr')
  324. pagesize = 64 * 1024 if pagesize is None else pagesize
  325. assert type(pagesize) is int, "invalid type of pagesize[%s]" \
  326. % (str(pagesize))
  327. capacity = DEFAULT_SHARED_MEMORY_SIZE if capacity is None else capacity
  328. assert type(capacity) is int, "invalid type of capacity[%s]" \
  329. % (str(capacity))
  330. assert capacity > 0, '"size of shared memory should be greater than 0'
  331. self._released = False
  332. self._cap = capacity
  333. self._page_size = pagesize
  334. assert self._cap % self._page_size == 0, \
  335. "capacity[%d] and pagesize[%d] are not consistent" \
  336. % (self._cap, self._page_size)
  337. self._total_pages = self._cap // self._page_size
  338. self._pid = os.getpid()
  339. SharedMemoryMgr.s_mgr_num += 1
  340. self._id = self._pid * 100 + SharedMemoryMgr.s_mgr_num
  341. SharedMemoryMgr.s_memory_mgrs[self._id] = self
  342. self._locker = Lock()
  343. self._setup()
  344. def _setup(self):
  345. self._shared_mem = RawArray('c', self._cap)
  346. self._base = np.frombuffer(
  347. self._shared_mem, dtype='uint8', count=self._cap)
  348. self._locker.acquire()
  349. try:
  350. self._allocator = PageAllocator(self._base, self._total_pages,
  351. self._page_size)
  352. finally:
  353. self._locker.release()
  354. def malloc(self, size, wait=True):
  355. """ malloc a new SharedBuffer
  356. Args:
  357. size (int): buffer size to be malloc
  358. wait (bool): whether to wait when no enough memory
  359. Returns:
  360. SharedBuffer
  361. Raises:
  362. SharedMemoryError when not found available memory
  363. """
  364. page_num = int(math.ceil(size / self._page_size))
  365. size = page_num * self._page_size
  366. start = None
  367. ct = 0
  368. errmsg = ''
  369. while True:
  370. self._locker.acquire()
  371. try:
  372. start = self._allocator.malloc_page(page_num)
  373. alloc_status = str(self._allocator)
  374. except MemoryFullError as e:
  375. start = None
  376. errmsg = e.errmsg
  377. if not wait:
  378. raise e
  379. finally:
  380. self._locker.release()
  381. if start is None:
  382. time.sleep(0.1)
  383. if ct % 100 == 0:
  384. logger.warn('not enough space for reason[%s]' % (errmsg))
  385. ct += 1
  386. else:
  387. break
  388. return SharedBuffer(self._id, size, start, alloc_status=alloc_status)
  389. def free(self, shared_buf):
  390. """ free a SharedBuffer
  391. Args:
  392. shared_buf (SharedBuffer): buffer to be freed
  393. Returns:
  394. None
  395. Raises:
  396. SharedMemoryError when failed to release this buffer
  397. """
  398. assert shared_buf._owner == self._id, "invalid shared_buf[%s] "\
  399. "for it's not allocated from me[%s]" % (str(shared_buf), str(self))
  400. cap = shared_buf.capacity()
  401. start_page = shared_buf._pos
  402. page_num = cap // self._page_size
  403. #maybe we don't need this lock here
  404. self._locker.acquire()
  405. try:
  406. self._allocator.free_page(start_page, page_num)
  407. finally:
  408. self._locker.release()
  409. def put_data(self, shared_buf, data):
  410. """ fill 'data' into 'shared_buf'
  411. """
  412. assert len(data) <= shared_buf.capacity(), 'too large data[%d] '\
  413. 'for this buffer[%s]' % (len(data), str(shared_buf))
  414. start = shared_buf._pos * self._page_size
  415. end = start + len(data)
  416. assert start >= 0 and end <= self._cap, "invalid start "\
  417. "position[%d] when put data to buff:%s" % (start, str(shared_buf))
  418. self._base[start:end] = np.frombuffer(data, 'uint8', len(data))
  419. def get_data(self, shared_buf, offset, size, no_copy=True):
  420. """ extract 'data' from 'shared_buf' in range [offset, offset + size)
  421. """
  422. start = shared_buf._pos * self._page_size
  423. start += offset
  424. if no_copy:
  425. return self._base[start:start + size]
  426. else:
  427. return self._base[start:start + size].tostring()
  428. def __str__(self):
  429. return 'SharedMemoryMgr:{id:%d, %s}' % (self._id, str(self._allocator))
  430. def __del__(self):
  431. if SharedMemoryMgr.s_log_statis:
  432. logger.info('destroy [%s]' % (self))
  433. if not self._released and not self._allocator.empty():
  434. logger.debug('not empty when delete this SharedMemoryMgr[%s]' %
  435. (self))
  436. else:
  437. self._released = True
  438. if self._id in SharedMemoryMgr.s_memory_mgrs:
  439. del SharedMemoryMgr.s_memory_mgrs[self._id]
  440. SharedMemoryMgr.s_mgr_num -= 1