Difference between revisions of "Multiprocess.Lock replacement using SYSV"
From Tech
Jump to navigationJump to search(2 intermediate revisions by 2 users not shown) | |||
Line 1: | Line 1: | ||
+ | For instance on the TS-7800, when trying to get a Lock() from multprocessing.Lock, this is the error message you get: |
||
⚫ | |||
+ | Traceback (most recent call last): |
||
+ | File "LockTest.py", line 6, in <module> |
||
⚫ | |||
+ | File "/usr/local/lib/python2.7/multiprocessing/__init__.py", line 175, in Lock |
||
+ | from multiprocessing.synchronize import Lock |
||
+ | File "/usr/local/lib/python2.7/multiprocessing/synchronize.py", line 59, in <module> |
||
+ | " function, see issue 3770.") |
||
+ | ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770. |
||
+ | </nowiki> |
||
+ | |||
+ | This can be worked around with the following: |
||
+ | |||
Contents of <tt>Lock_.c</tt> |
Contents of <tt>Lock_.c</tt> |
||
<nowiki>#include <Python.h> |
<nowiki>#include <Python.h> |
||
Line 27: | Line 41: | ||
} |
} |
||
− | + | unsigned char acquire(int semid, unsigned char blocking){ |
|
struct sembuf sops[2]; |
struct sembuf sops[2]; |
||
Line 33: | Line 47: | ||
sops[0].sem_op = -1; |
sops[0].sem_op = -1; |
||
sops[0].sem_flg = SEM_UNDO; |
sops[0].sem_flg = SEM_UNDO; |
||
+ | if(!blocking) |
||
+ | sops[0].sem_flg |= IPC_NOWAIT; |
||
if (semop(semid, sops, 1) == -1) { |
if (semop(semid, sops, 1) == -1) { |
||
+ | if(errno==EAGAIN) |
||
+ | return 0; |
||
perror("acquire"); |
perror("acquire"); |
||
exit(1); |
exit(1); |
||
} |
} |
||
+ | return 1; |
||
} |
} |
||
Line 101: | Line 120: | ||
{ |
{ |
||
int semid; |
int semid; |
||
+ | unsigned char blocking; |
||
⚫ | |||
+ | unsigned char success; |
||
⚫ | |||
return NULL; |
return NULL; |
||
− | acquire(semid); |
+ | success=acquire(semid, blocking); |
− | return Py_BuildValue(""); |
+ | return Py_BuildValue("b", success); |
} |
} |
||
static PyObject * |
static PyObject * |
||
Line 128: | Line 149: | ||
return 0; |
return 0; |
||
+ | }</nowiki> |
||
− | } |
||
⚫ | |||
Contents of <tt>Lock.py</tt> |
Contents of <tt>Lock.py</tt> |
||
Line 137: | Line 157: | ||
def __init__(self): |
def __init__(self): |
||
self.semid=Lock_.get() |
self.semid=Lock_.get() |
||
− | def acquire(self): |
+ | def acquire(self, blocking=True): |
− | Lock_.acquire(self.semid) |
+ | return Lock_.acquire(self.semid, blocking) |
+ | #print "acquire", self.semid |
||
def release(self): |
def release(self): |
||
+ | #print "release", self.semid |
||
Lock_.release(self.semid) |
Lock_.release(self.semid) |
||
+ | def __enter__(self): |
||
+ | self.acquire() |
||
+ | def __exit__(self, type, value, traceback): |
||
+ | self.release() |
||
</nowiki> |
</nowiki> |
||
Line 152: | Line 178: | ||
Contents of <tt>test.py</tt> |
Contents of <tt>test.py</tt> |
||
− | <nowiki>import |
+ | <nowiki>import time |
− | import time |
||
import random |
import random |
||
from multiprocessing import Process |
from multiprocessing import Process |
||
+ | from Lock import Lock |
||
+ | #from multiprocessing import Lock |
||
def test(l, a): |
def test(l, a): |
||
− | for i in range( |
+ | for i in range(4): |
l.acquire() |
l.acquire() |
||
− | for j in range( |
+ | for j in range(4): |
print a |
print a |
||
− | time.sleep(random.random()) |
+ | time.sleep(random.random()/30) |
⚫ | |||
l.release() |
l.release() |
||
+ | |||
⚫ | |||
+ | def test_with(l, a): |
||
⚫ | |||
+ | for i in range(4): |
||
⚫ | |||
+ | with l as L: |
||
+ | for j in range(4): |
||
⚫ | |||
+ | time.sleep(random.random()/30) |
||
+ | |||
+ | |||
+ | |||
+ | l=Lock() |
||
+ | t=test_with |
||
⚫ | |||
⚫ | |||
+ | Process(target=t, args=(l," c")).start() |
||
</nowiki> |
</nowiki> |
Latest revision as of 22:55, 12 February 2013
For instance on the TS-7800, when trying to get a Lock() from multprocessing.Lock, this is the error message you get:
Traceback (most recent call last): File "LockTest.py", line 6, in <module> l=Lock() File "/usr/local/lib/python2.7/multiprocessing/__init__.py", line 175, in Lock from multiprocessing.synchronize import Lock File "/usr/local/lib/python2.7/multiprocessing/synchronize.py", line 59, in <module> " function, see issue 3770.") ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
This can be worked around with the following:
Contents of Lock_.c
#include <Python.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int get_Lock(void){ int semid; int r; union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */ }; union semun su; semid=semget(IPC_PRIVATE, 1, IPC_CREAT|0666); su.val=1; r=semctl(semid, 0, SETVAL, su); if(r==-1){ perror("semctl"); exit(1); } return semid; } unsigned char acquire(int semid, unsigned char blocking){ struct sembuf sops[2]; sops[0].sem_num = 0; sops[0].sem_op = -1; sops[0].sem_flg = SEM_UNDO; if(!blocking) sops[0].sem_flg |= IPC_NOWAIT; if (semop(semid, sops, 1) == -1) { if(errno==EAGAIN) return 0; perror("acquire"); exit(1); } return 1; } void release(int semid){ struct sembuf sops[2]; sops[0].sem_num = 0; /* Operate on semaphore 0 */ sops[0].sem_op = 1; /* Wait for value to equal 0 */ sops[0].sem_flg = SEM_UNDO; if (semop(semid, sops, 1) == -1) { perror("release"); exit(1); } } static PyObject * Lock_get(PyObject *self, PyObject *args); static PyObject * Lock_acquire(PyObject *self, PyObject *args); static PyObject * Lock_release(PyObject *self, PyObject *args); static PyMethodDef LockMethods[] = { {"get", Lock_get, METH_VARARGS, "get a SYSV Lock"}, {"acquire", Lock_acquire, METH_VARARGS, "acquire lock (blocks if unavailable)"}, {"release", Lock_release, METH_VARARGS, "release lock"}, {NULL, NULL, 0, NULL} /* Sentinel */ }; static PyObject *LockError; PyMODINIT_FUNC initLock_(void) { PyObject *m; m = Py_InitModule("Lock_", LockMethods); if (m == NULL) return; LockError = PyErr_NewException("Lock_.error", NULL, NULL); Py_INCREF(LockError); PyModule_AddObject(m, "error", LockError); } static PyObject * Lock_get(PyObject *self, PyObject *args) { int semid; semid=get_Lock(); if ( semid==-1) { PyErr_SetString(LockError, "System command failed"); return NULL; } return Py_BuildValue("i", semid); } static PyObject * Lock_acquire(PyObject *self, PyObject *args) { int semid; unsigned char blocking; unsigned char success; if (!PyArg_ParseTuple(args, "ib", &semid, &blocking)) return NULL; success=acquire(semid, blocking); return Py_BuildValue("b", success); } static PyObject * Lock_release(PyObject *self, PyObject *args) { int semid; if (!PyArg_ParseTuple(args, "i", &semid)) return NULL; release(semid); return Py_BuildValue(""); } int main(int argc, char *argv[]) { /* Pass argv[0] to the Python interpreter */ Py_SetProgramName(argv[0]); /* Initialize the Python interpreter. Required. */ Py_Initialize(); /* Add a static module */ initLock_(); return 0; }
Contents of Lock.py
import Lock_ class Lock: def __init__(self): self.semid=Lock_.get() def acquire(self, blocking=True): return Lock_.acquire(self.semid, blocking) #print "acquire", self.semid def release(self): #print "release", self.semid Lock_.release(self.semid) def __enter__(self): self.acquire() def __exit__(self, type, value, traceback): self.release()
Contents of Makefile
CFLAGS=-Wall -fPIC -I /usr/include/python2.7 #LDFLAGS=-lpython2.7 Lock_.so:Lock_.o gcc -shared Lock_.o -o Lock_.so
Contents of test.py
import time import random from multiprocessing import Process from Lock import Lock #from multiprocessing import Lock def test(l, a): for i in range(4): l.acquire() for j in range(4): print a time.sleep(random.random()/30) l.release() def test_with(l, a): for i in range(4): with l as L: for j in range(4): print a time.sleep(random.random()/30) l=Lock() t=test_with Process(target=t, args=(l,"a")).start() Process(target=t, args=(l," b")).start() Process(target=t, args=(l," c")).start()