sharedmemory.py 18 KB


  1. # copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # utils for memory management which is allocated on sharedmemory,
  15. # note that these structures may not be thread-safe
  16. from __future__ import absolute_import
  17. from __future__ import division
  18. from __future__ import print_function
  19. from __future__ import unicode_literals
  20. import os
  21. import time
  22. import math
  23. import struct
  24. import sys
  25. import six
  26. if six.PY3:
  27. import pickle
  28. else:
  29. import cPickle as pickle
  30. import json
  31. import uuid
  32. import random
  33. import numpy as np
  34. import weakref
  35. import logging
  36. from multiprocessing import Lock
  37. from multiprocessing import RawArray
  38. logger = logging.getLogger(__name__)
  39. class SharedMemoryError(ValueError):
  40. """ SharedMemoryError
  41. """
  42. pass
  43. class SharedBufferError(SharedMemoryError):
  44. """ SharedBufferError
  45. """
  46. pass
  47. class MemoryFullError(SharedMemoryError):
  48. """ MemoryFullError
  49. """
  50. def __init__(self, errmsg=''):
  51. super(MemoryFullError, self).__init__()
  52. self.errmsg = errmsg
  53. def memcopy(dst, src, offset=0, length=None):
  54. """ copy data from 'src' to 'dst' in bytes
  55. """
  56. length = length if length is not None else len(src)
  57. assert type(dst) == np.ndarray, 'invalid type for "dst" in memcopy'
  58. if type(src) is not np.ndarray:
  59. if type(src) is str and six.PY3:
  60. src = src.encode()
  61. src = np.frombuffer(src, dtype='uint8', count=len(src))
  62. dst[:] = src[offset:offset + length]
  63. class SharedBuffer(object):
  64. """ Buffer allocated from SharedMemoryMgr, and it stores data on shared memory
  65. note that:
  66. every instance of this should be freed explicitely by calling 'self.free'
  67. """
  68. def __init__(self, owner, capacity, pos, size=0, alloc_status=''):
  69. """ Init
  70. Args:
  71. owner (str): manager to own this buffer
  72. capacity (int): capacity in bytes for this buffer
  73. pos (int): page position in shared memory
  74. size (int): bytes already used
  75. alloc_status (str): debug info about allocator when allocate this
  76. """
  77. self._owner = owner
  78. self._cap = capacity
  79. self._pos = pos
  80. self._size = size
  81. self._alloc_status = alloc_status
  82. assert self._pos >= 0 and self._cap > 0, \
  83. "invalid params[%d:%d] to construct SharedBuffer" \
  84. % (self._pos, self._cap)
  85. def owner(self):
  86. """ get owner
  87. """
  88. return SharedMemoryMgr.get_mgr(self._owner)
  89. def put(self, data, override=False):
  90. """ put data to this buffer
  91. Args:
  92. data (str): data to be stored in this buffer
  93. Returns:
  94. None
  95. Raises:
  96. SharedMemoryError when not enough space in this buffer
  97. """
  98. assert type(data) in [str, bytes], \
  99. 'invalid type[%s] for SharedBuffer::put' % (str(type(data)))
  100. if self._size > 0 and not override:
  101. raise SharedBufferError('already has already been setted before')
  102. if self.capacity() < len(data):
  103. raise SharedBufferError('data[%d] is larger than size of buffer[%s]'\
  104. % (len(data), str(self)))
  105. self.owner().put_data(self, data)
  106. self._size = len(data)
  107. def get(self, offset=0, size=None, no_copy=True):
  108. """ get the data stored this buffer
  109. Args:
  110. offset (int): position for the start point to 'get'
  111. size (int): size to get
  112. Returns:
  113. data (np.ndarray('uint8')): user's data in numpy
  114. which is passed in by 'put'
  115. None: if no data stored in
  116. """
  117. offset = offset if offset >= 0 else self._size + offset
  118. if self._size <= 0:
  119. return None
  120. size = self._size if size is None else size
  121. assert offset + size <= self._cap, 'invalid offset[%d] '\
  122. 'or size[%d] for capacity[%d]' % (offset, size, self._cap)
  123. return self.owner().get_data(self, offset, size, no_copy=no_copy)
  124. def size(self):
  125. """ bytes of used memory
  126. """
  127. return self._size
  128. def resize(self, size):
  129. """ resize the used memory to 'size', should not be greater than capacity
  130. """
  131. assert size >= 0 and size <= self._cap, \
  132. "invalid size[%d] for resize" % (size)
  133. self._size = size
  134. def capacity(self):
  135. """ size of allocated memory
  136. """
  137. return self._cap
  138. def __str__(self):
  139. """ human readable format
  140. """
  141. return "SharedBuffer(owner:%s, pos:%d, size:%d, "\
  142. "capacity:%d, alloc_status:[%s], pid:%d)" \
  143. % (str(self._owner), self._pos, self._size, \
  144. self._cap, self._alloc_status, os.getpid())
  145. def free(self):
  146. """ free this buffer to it's owner
  147. """
  148. if self._owner is not None:
  149. self.owner().free(self)
  150. self._owner = None
  151. self._cap = 0
  152. self._pos = -1
  153. self._size = 0
  154. return True
  155. else:
  156. return False
  157. class PageAllocator(object):
  158. """ allocator used to malloc and free shared memory which
  159. is split into pages
  160. """
  161. s_allocator_header = 12
  162. def __init__(self, base, total_pages, page_size):
  163. """ init
  164. """
  165. self._magic_num = 1234321000 + random.randint(100, 999)
  166. self._base = base
  167. self._total_pages = total_pages
  168. self._page_size = page_size
  169. header_pages = int(
  170. math.ceil((total_pages + self.s_allocator_header) / page_size))
  171. self._header_pages = header_pages
  172. self._free_pages = total_pages - header_pages
  173. self._header_size = self._header_pages * page_size
  174. self._reset()
  175. def _dump_alloc_info(self, fname):
  176. hpages, tpages, pos, used = self.header()
  177. start = self.s_allocator_header
  178. end = start + self._page_size * hpages
  179. try:
  180. alloc_flags = self._base[start:end].tobytes()
  181. except:
  182. alloc_flags = self._base[start:end].tostring()
  183. info = {
  184. 'magic_num': self._magic_num,
  185. 'header_pages': hpages,
  186. 'total_pages': tpages,
  187. 'pos': pos,
  188. 'used': used
  189. }
  190. info['alloc_flags'] = alloc_flags
  191. fname = fname + '.' + str(uuid.uuid4())[:6]
  192. with open(fname, 'wb') as f:
  193. f.write(pickle.dumps(info, -1))
  194. logger.warn('dump alloc info to file[%s]' % (fname))
  195. def _reset(self):
  196. alloc_page_pos = self._header_pages
  197. used_pages = self._header_pages
  198. header_info = struct.pack(
  199. str('III'), self._magic_num, alloc_page_pos, used_pages)
  200. assert len(header_info) == self.s_allocator_header, \
  201. 'invalid size of header_info'
  202. memcopy(self._base[0:self.s_allocator_header], header_info)
  203. self.set_page_status(0, self._header_pages, '1')
  204. self.set_page_status(self._header_pages, self._free_pages, '0')
  205. def header(self):
  206. """ get header info of this allocator
  207. """
  208. try:
  209. header_str = self._base[0:self.s_allocator_header].tobytes()
  210. except:
  211. header_str = self._base[0:self.s_allocator_header].tostring()
  212. magic, pos, used = struct.unpack(str('III'), header_str)
  213. assert magic == self._magic_num, \
  214. 'invalid header magic[%d] in shared memory' % (magic)
  215. return self._header_pages, self._total_pages, pos, used
  216. def empty(self):
  217. """ are all allocatable pages available
  218. """
  219. header_pages, pages, pos, used = self.header()
  220. return header_pages == used
  221. def full(self):
  222. """ are all allocatable pages used
  223. """
  224. header_pages, pages, pos, used = self.header()
  225. return header_pages + used == pages
  226. def __str__(self):
  227. header_pages, pages, pos, used = self.header()
  228. desc = '{page_info[magic:%d,total:%d,used:%d,header:%d,alloc_pos:%d,pagesize:%d]}' \
  229. % (self._magic_num, pages, used, header_pages, pos, self._page_size)
  230. return 'PageAllocator:%s' % (desc)
  231. def set_alloc_info(self, alloc_pos, used_pages):
  232. """ set allocating position to new value
  233. """
  234. memcopy(self._base[4:12],
  235. struct.pack(str('II'), alloc_pos, used_pages))
  236. def set_page_status(self, start, page_num, status):
  237. """ set pages from 'start' to 'end' with new same status 'status'
  238. """
  239. assert status in ['0', '1'], 'invalid status[%s] for page status '\
  240. 'in allocator[%s]' % (status, str(self))
  241. start += self.s_allocator_header
  242. end = start + page_num
  243. assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
  244. 'in allocator[%s]' % (end, str(self))
  245. memcopy(self._base[start:end], str(status * page_num))
  246. def get_page_status(self, start, page_num, ret_flag=False):
  247. start += self.s_allocator_header
  248. end = start + page_num
  249. assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
  250. 'in allocator[%s]' % (end, str(self))
  251. try:
  252. status = self._base[start:end].tobytes().decode()
  253. except:
  254. status = self._base[start:end].tostring().decode()
  255. if ret_flag:
  256. return status
  257. zero_num = status.count('0')
  258. if zero_num == 0:
  259. return (page_num, 1)
  260. else:
  261. return (zero_num, 0)
  262. def malloc_page(self, page_num):
  263. header_pages, pages, pos, used = self.header()
  264. end = pos + page_num
  265. if end > pages:
  266. pos = self._header_pages
  267. end = pos + page_num
  268. start_pos = pos
  269. flags = ''
  270. while True:
  271. # maybe flags already has some '0' pages,
  272. # so just check 'page_num - len(flags)' pages
  273. flags = self.get_page_status(pos, page_num, ret_flag=True)
  274. if flags.count('0') == page_num:
  275. break
  276. # not found enough pages, so shift to next few pages
  277. free_pos = flags.rfind('1') + 1
  278. pos += free_pos
  279. end = pos + page_num
  280. if end > pages:
  281. pos = self._header_pages
  282. end = pos + page_num
  283. flags = ''
  284. # not found available pages after scan all pages
  285. if pos <= start_pos and end >= start_pos:
  286. logger.debug('not found available pages after scan all pages')
  287. break
  288. page_status = (flags.count('0'), 0)
  289. if page_status != (page_num, 0):
  290. free_pages = self._total_pages - used
  291. if free_pages == 0:
  292. err_msg = 'all pages have been used:%s' % (str(self))
  293. else:
  294. err_msg = 'not found available pages with page_status[%s] '\
  295. 'and %d free pages' % (str(page_status), free_pages)
  296. err_msg = 'failed to malloc %d pages at pos[%d] for reason[%s] and allocator status[%s]' \
  297. % (page_num, pos, err_msg, str(self))
  298. raise MemoryFullError(err_msg)
  299. self.set_page_status(pos, page_num, '1')
  300. used += page_num
  301. self.set_alloc_info(end, used)
  302. return pos
  303. def free_page(self, start, page_num):
  304. """ free 'page_num' pages start from 'start'
  305. """
  306. page_status = self.get_page_status(start, page_num)
  307. assert page_status == (page_num, 1), \
  308. 'invalid status[%s] when free [%d, %d]' \
  309. % (str(page_status), start, page_num)
  310. self.set_page_status(start, page_num, '0')
  311. _, _, pos, used = self.header()
  312. used -= page_num
  313. self.set_alloc_info(pos, used)
  314. DEFAULT_SHARED_MEMORY_SIZE = 1024 * 1024 * 1024
  315. class SharedMemoryMgr(object):
  316. """ manage a continouse block of memory, provide
  317. 'malloc' to allocate new buffer, and 'free' to free buffer
  318. """
  319. s_memory_mgrs = weakref.WeakValueDictionary()
  320. s_mgr_num = 0
  321. s_log_statis = False
  322. @classmethod
  323. def get_mgr(cls, id):
  324. """ get a SharedMemoryMgr with size of 'capacity'
  325. """
  326. assert id in cls.s_memory_mgrs, 'invalid id[%s] for memory managers' % (
  327. id)
  328. return cls.s_memory_mgrs[id]
  329. def __init__(self, capacity=None, pagesize=None):
  330. """ init
  331. """
  332. logger.debug('create SharedMemoryMgr')
  333. pagesize = 64 * 1024 if pagesize is None else pagesize
  334. assert type(pagesize) is int, "invalid type of pagesize[%s]" \
  335. % (str(pagesize))
  336. capacity = DEFAULT_SHARED_MEMORY_SIZE if capacity is None else capacity
  337. assert type(capacity) is int, "invalid type of capacity[%s]" \
  338. % (str(capacity))
  339. assert capacity > 0, '"size of shared memory should be greater than 0'
  340. self._released = False
  341. self._cap = capacity
  342. self._page_size = pagesize
  343. assert self._cap % self._page_size == 0, \
  344. "capacity[%d] and pagesize[%d] are not consistent" \
  345. % (self._cap, self._page_size)
  346. self._total_pages = self._cap // self._page_size
  347. self._pid = os.getpid()
  348. SharedMemoryMgr.s_mgr_num += 1
  349. self._id = self._pid * 100 + SharedMemoryMgr.s_mgr_num
  350. SharedMemoryMgr.s_memory_mgrs[self._id] = self
  351. self._locker = Lock()
  352. self._setup()
  353. def _setup(self):
  354. self._shared_mem = RawArray('c', self._cap)
  355. self._base = np.frombuffer(
  356. self._shared_mem, dtype='uint8', count=self._cap)
  357. self._locker.acquire()
  358. try:
  359. self._allocator = PageAllocator(self._base, self._total_pages,
  360. self._page_size)
  361. finally:
  362. self._locker.release()
  363. def malloc(self, size, wait=True):
  364. """ malloc a new SharedBuffer
  365. Args:
  366. size (int): buffer size to be malloc
  367. wait (bool): whether to wait when no enough memory
  368. Returns:
  369. SharedBuffer
  370. Raises:
  371. SharedMemoryError when not found available memory
  372. """
  373. page_num = int(math.ceil(size / self._page_size))
  374. size = page_num * self._page_size
  375. start = None
  376. ct = 0
  377. errmsg = ''
  378. while True:
  379. self._locker.acquire()
  380. try:
  381. start = self._allocator.malloc_page(page_num)
  382. alloc_status = str(self._allocator)
  383. except MemoryFullError as e:
  384. start = None
  385. errmsg = e.errmsg
  386. if not wait:
  387. raise e
  388. finally:
  389. self._locker.release()
  390. if start is None:
  391. time.sleep(0.1)
  392. if ct % 100 == 0:
  393. logger.warn('not enough space for reason[%s]' % (errmsg))
  394. ct += 1
  395. else:
  396. break
  397. return SharedBuffer(self._id, size, start, alloc_status=alloc_status)
  398. def free(self, shared_buf):
  399. """ free a SharedBuffer
  400. Args:
  401. shared_buf (SharedBuffer): buffer to be freed
  402. Returns:
  403. None
  404. Raises:
  405. SharedMemoryError when failed to release this buffer
  406. """
  407. assert shared_buf._owner == self._id, "invalid shared_buf[%s] "\
  408. "for it's not allocated from me[%s]" % (str(shared_buf), str(self))
  409. cap = shared_buf.capacity()
  410. start_page = shared_buf._pos
  411. page_num = cap // self._page_size
  412. #maybe we don't need this lock here
  413. self._locker.acquire()
  414. try:
  415. self._allocator.free_page(start_page, page_num)
  416. finally:
  417. self._locker.release()
  418. def put_data(self, shared_buf, data):
  419. """ fill 'data' into 'shared_buf'
  420. """
  421. assert len(data) <= shared_buf.capacity(), 'too large data[%d] '\
  422. 'for this buffer[%s]' % (len(data), str(shared_buf))
  423. start = shared_buf._pos * self._page_size
  424. end = start + len(data)
  425. assert start >= 0 and end <= self._cap, "invalid start "\
  426. "position[%d] when put data to buff:%s" % (start, str(shared_buf))
  427. self._base[start:end] = np.frombuffer(data, 'uint8', len(data))
  428. def get_data(self, shared_buf, offset, size, no_copy=True):
  429. """ extract 'data' from 'shared_buf' in range [offset, offset + size)
  430. """
  431. start = shared_buf._pos * self._page_size
  432. start += offset
  433. if no_copy:
  434. return self._base[start:start + size]
  435. else:
  436. try:
  437. return self._base[start:start + size].tobytes()
  438. except:
  439. return self._base[start:start + size].tostring()
  440. def __str__(self):
  441. return 'SharedMemoryMgr:{id:%d, %s}' % (self._id, str(self._allocator))
  442. def __del__(self):
  443. if SharedMemoryMgr.s_log_statis:
  444. logger.info('destroy [%s]' % (self))
  445. if not self._released and not self._allocator.empty():
  446. logger.debug('not empty when delete this SharedMemoryMgr[%s]' %
  447. (self))
  448. else:
  449. self._released = True
  450. if self._id in SharedMemoryMgr.s_memory_mgrs:
  451. del SharedMemoryMgr.s_memory_mgrs[self._id]
  452. SharedMemoryMgr.s_mgr_num -= 1