Preview: deadlock_detecting_lock.py
Size: 755 B
//opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/deadlock_detecting_lock.py
import asyncio
class DeadlockError(Exception):
"""Error raised if DeadlockDetectingLock detects deadlock"""
class DeadlockDetectingLock:
"""
Lock that detects deadlock when it is about to be
acquired by the same task that already holds it.
"""
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
def locked(self):
return self._lock.locked()
async def __aenter__(self):
curr_task = asyncio.current_task()
if self._owner == curr_task:
raise DeadlockError()
await self._lock.acquire()
self._owner = curr_task
return self
async def __aexit__(self, exc_type, exc, tb):
self._owner = None
self._lock.release()
Directory Contents
Dirs: 1 × Files: 13