Difference between revisions of "Multiprocess.Lock replacement using SYSV"

From Tech
Jump to navigationJump to search
 
Line 41: Line 41:
 
}
 
}
   
void acquire(int semid){
+
unsigned char acquire(int semid, unsigned char blocking){
 
struct sembuf sops[2];
 
struct sembuf sops[2];
 
 
Line 47: Line 47:
 
sops[0].sem_op = -1;
 
sops[0].sem_op = -1;
 
sops[0].sem_flg = SEM_UNDO;
 
sops[0].sem_flg = SEM_UNDO;
  +
if(!blocking)
  +
sops[0].sem_flg |= IPC_NOWAIT;
 
 
 
 
 
if (semop(semid, sops, 1) == -1) {
 
if (semop(semid, sops, 1) == -1) {
  +
if(errno==EAGAIN)
  +
return 0;
 
perror("acquire");
 
perror("acquire");
 
exit(1);
 
exit(1);
 
}
 
}
  +
return 1;
 
}
 
}
   
Line 115: Line 120:
 
{
 
{
 
int semid;
 
int semid;
  +
unsigned char blocking;
if (!PyArg_ParseTuple(args, "i", &semid))
 
  +
unsigned char success;
 
if (!PyArg_ParseTuple(args, "ib", &semid, &blocking))
 
return NULL;
 
return NULL;
acquire(semid);
+
success=acquire(semid, blocking);
return Py_BuildValue("");
+
return Py_BuildValue("b", success);
 
}
 
}
 
static PyObject *
 
static PyObject *
Line 142: Line 149:
 
return 0;
 
return 0;
   
 
}</nowiki>
}
 
</nowiki>
 
   
 
Contents of <tt>Lock.py</tt>
 
Contents of <tt>Lock.py</tt>
Line 151: Line 157:
 
def __init__(self):
 
def __init__(self):
 
self.semid=Lock_.get()
 
self.semid=Lock_.get()
def acquire(self):
+
def acquire(self, blocking=True):
Lock_.acquire(self.semid)
+
return Lock_.acquire(self.semid, blocking)
  +
#print "acquire", self.semid
 
def release(self):
 
def release(self):
  +
#print "release", self.semid
 
Lock_.release(self.semid)
 
Lock_.release(self.semid)
  +
def __enter__(self):
  +
self.acquire()
  +
def __exit__(self, type, value, traceback):
  +
self.release()
 
</nowiki>
 
</nowiki>
   
Line 166: Line 178:
   
 
Contents of <tt>test.py</tt>
 
Contents of <tt>test.py</tt>
<nowiki>import Lock
+
<nowiki>import time
import time
 
 
import random
 
import random
 
from multiprocessing import Process
 
from multiprocessing import Process
   
  +
from Lock import Lock
  +
#from multiprocessing import Lock
 
def test(l, a):
 
def test(l, a):
 
for i in range(4):
 
for i in range(4):
Line 177: Line 190:
 
print a
 
print a
 
time.sleep(random.random()/30)
 
time.sleep(random.random()/30)
print
 
 
l.release()
 
l.release()
l=Lock.Lock()
 
Process(target=test, args=(l,"a")).start()
 
Process(target=test, args=(l," b")).start()
 
Process(target=test, args=(l," c")).start()
 
   
  +
def test_with(l, a):
  +
for i in range(4):
  +
with l as L:
  +
for j in range(4):
 
print a
  +
time.sleep(random.random()/30)
  +
  +
  +
 
l=Lock()
  +
t=test_with
 
Process(target=t, args=(l,"a")).start()
 
Process(target=t, args=(l," b")).start()
 
Process(target=t, args=(l," c")).start()
 
</nowiki>
 
</nowiki>

Latest revision as of 23:55, 12 February 2013

For instance on the TS-7800, when trying to get a Lock() from multprocessing.Lock, this is the error message you get:

Traceback (most recent call last):
  File "LockTest.py", line 6, in <module>
    l=Lock()
  File "/usr/local/lib/python2.7/multiprocessing/__init__.py", line 175, in Lock
    from multiprocessing.synchronize import Lock
  File "/usr/local/lib/python2.7/multiprocessing/synchronize.py", line 59, in <module>
    " function, see issue 3770.")
ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

This can be worked around with the following:

Contents of Lock_.c

#include <Python.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int get_Lock(void){
  int semid;
  int r;
  union semun {
    int              val;    /* Value for SETVAL */
    struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
    unsigned short  *array;  /* Array for GETALL, SETALL */
    struct seminfo  *__buf;  /* Buffer for IPC_INFO
				(Linux-specific) */
  };
  union semun su;

  semid=semget(IPC_PRIVATE,  1, IPC_CREAT|0666);
  su.val=1;
  r=semctl(semid, 0, SETVAL, su);
  if(r==-1){
    perror("semctl");
    exit(1);
  }
  return semid;
}

unsigned char acquire(int semid, unsigned char blocking){
  struct sembuf sops[2];
    
  sops[0].sem_num = 0; 
  sops[0].sem_op = -1; 
  sops[0].sem_flg = SEM_UNDO;
  if(!blocking)
    sops[0].sem_flg |= IPC_NOWAIT;
  
  
  if (semop(semid, sops, 1) == -1) {
    if(errno==EAGAIN)
      return 0;
    perror("acquire");
    exit(1);
  }
  return 1;
}

void release(int semid){
  struct sembuf sops[2];
    
  sops[0].sem_num = 0;        /* Operate on semaphore 0 */
  sops[0].sem_op = 1;         /* Wait for value to equal 0 */
  sops[0].sem_flg = SEM_UNDO;
  
  
  if (semop(semid, sops, 1) == -1) {
    perror("release");
    exit(1);
  }
}

static PyObject * Lock_get(PyObject *self, PyObject *args);
static PyObject * Lock_acquire(PyObject *self, PyObject *args);
static PyObject * Lock_release(PyObject *self, PyObject *args);

static PyMethodDef LockMethods[] = {
    {"get",      Lock_get, METH_VARARGS,     "get a SYSV Lock"},
    {"acquire",  Lock_acquire, METH_VARARGS, "acquire lock (blocks if unavailable)"},
    {"release",  Lock_release, METH_VARARGS, "release lock"},
    {NULL, NULL, 0, NULL}        /* Sentinel */
};



static PyObject *LockError;
PyMODINIT_FUNC
initLock_(void)
{
    PyObject *m;

    m = Py_InitModule("Lock_", LockMethods);
    if (m == NULL)
        return;

    LockError = PyErr_NewException("Lock_.error", NULL, NULL);
    Py_INCREF(LockError);
    PyModule_AddObject(m, "error", LockError);
}



static PyObject *
Lock_get(PyObject *self, PyObject *args)
{
    int semid;

    semid=get_Lock();
    if ( semid==-1) {
        PyErr_SetString(LockError, "System command failed");
        return NULL;
    }
    return Py_BuildValue("i", semid);
}
static PyObject *
Lock_acquire(PyObject *self, PyObject *args)
{
    int semid;
    unsigned char blocking;
    unsigned char success;
    if (!PyArg_ParseTuple(args, "ib", &semid, &blocking))
      return NULL;
    success=acquire(semid, blocking);
    return Py_BuildValue("b", success);
}
static PyObject *
Lock_release(PyObject *self, PyObject *args)
{
    int semid;
    if (!PyArg_ParseTuple(args, "i", &semid))
      return NULL;
    release(semid);
    return Py_BuildValue("");
}

int main(int argc, char *argv[])
{
    /* Pass argv[0] to the Python interpreter */
    Py_SetProgramName(argv[0]);

    /* Initialize the Python interpreter.  Required. */
    Py_Initialize();

    /* Add a static module */
    initLock_();
    return 0;

}

Contents of Lock.py

import Lock_

class Lock:
    def __init__(self):
        self.semid=Lock_.get()
    def acquire(self, blocking=True):
        return Lock_.acquire(self.semid, blocking)
        #print "acquire", self.semid
    def release(self):
        #print "release", self.semid
        Lock_.release(self.semid)
    def __enter__(self):
        self.acquire()
    def __exit__(self, type, value, traceback):
        self.release()

Contents of Makefile

CFLAGS=-Wall -fPIC -I /usr/include/python2.7
#LDFLAGS=-lpython2.7

Lock_.so:Lock_.o
	gcc -shared Lock_.o -o Lock_.so

Contents of test.py

import time
import random
from multiprocessing import Process

from Lock import Lock
#from multiprocessing import Lock
def test(l, a):
    for i in range(4):
        l.acquire()
        for j in range(4):
            print a
            time.sleep(random.random()/30)
        l.release()

def test_with(l, a):
    for i in range(4):
        with l as L:
            for j in range(4):
                print a
                time.sleep(random.random()/30)



l=Lock()
t=test_with
Process(target=t, args=(l,"a")).start()
Process(target=t, args=(l,"  b")).start()
Process(target=t, args=(l,"    c")).start()