Use posix_ipc to manage shared memory

This commit is contained in:
Nicolas Mowen 2024-11-16 06:34:19 -07:00
parent ad85f8882b
commit 707f62c891

View File

@ -3,15 +3,13 @@
import datetime import datetime
import logging import logging
import subprocess as sp import subprocess as sp
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing import resource_tracker as _mprt
from multiprocessing import shared_memory as _mpshm
from string import printable from string import printable
from typing import AnyStr, Optional from typing import AnyStr, Optional
import cv2 import cv2
import numpy as np import numpy as np
import posix_ipc
from unidecode import unidecode from unidecode import unidecode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -733,48 +731,26 @@ class FrameManager(ABC):
pass pass
class UntrackedSharedMemory(_mpshm.SharedMemory): class UntrackedSharedMemory:
# https://github.com/python/cpython/issues/82300#issuecomment-2169035092
__lock = threading.Lock()
def __init__( def __init__(
self, self,
name: Optional[str] = None, name: Optional[str] = None,
create: bool = False, create: bool = False,
unlink: bool = False,
size: int = 0, size: int = 0,
*,
track: bool = False,
) -> None: ) -> None:
self._track = track if unlink:
# if tracking, normal init will suffice
if track:
return super().__init__(name=name, create=create, size=size)
# lock so that other threads don't attempt to use the
# register function during this time
with self.__lock:
# temporarily disable registration during initialization
orig_register = _mprt.register
_mprt.register = self.__tmp_register
# initialize; ensure original register function is
# re-instated
try:
super().__init__(name=name, create=create, size=size)
finally:
_mprt.register = orig_register
@staticmethod
def __tmp_register(*args, **kwargs) -> None:
return return
def unlink(self) -> None: flag = posix_ipc.O_CREAT if create else 0
if _mpshm._USE_POSIX and self._name: self.shm_store = posix_ipc.SharedMemory(name, flags=flag, size=size)
_mpshm._posixshmem.shm_unlink(self._name)
if self._track: def close(self) -> None:
_mprt.unregister(self._name, "shared_memory") self.shm_store.fd_close()
def unlink(self, name: str) -> None:
shm = posix_ipc.SharedMemory(name, flags=posix_ipc.O_TRUNC, size=0)
shm.unlink()
class SharedMemoryFrameManager(FrameManager): class SharedMemoryFrameManager(FrameManager):
@ -811,14 +787,14 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name].close() self.shm_store[name].close()
try: try:
self.shm_store[name].unlink() self.shm_store[name].unlink(name)
except FileNotFoundError: except FileNotFoundError:
pass pass
del self.shm_store[name] del self.shm_store[name]
else: else:
try: try:
shm = UntrackedSharedMemory(name=name) shm = UntrackedSharedMemory(name=name, unlink=True)
shm.close() shm.close()
shm.unlink() shm.unlink()
except FileNotFoundError: except FileNotFoundError: