#include #include #include #define PY_SSIZE_T_CLEAN #include #include #include "src/threadpool/threadpool.h" static const unsigned CTX_POOL_SIZE_DEFAULT = 8; static const unsigned CTX_MAX_REQUESTS_DEFAULT = 512; static PyTypeObject AIOOperationType; static PyTypeObject AIOContextType; typedef struct { PyObject_HEAD threadpool_t* pool; uint16_t max_requests; uint8_t pool_size; } AIOContext; typedef struct { PyObject_HEAD PyObject* py_buffer; PyObject* callback; int opcode; unsigned int fileno; off_t offset; int result; uint8_t error; uint8_t in_progress; Py_ssize_t buf_size; char* buf; PyObject* ctx; } AIOOperation; enum THAIO_OP_CODE { THAIO_READ, THAIO_WRITE, THAIO_FSYNC, THAIO_FDSYNC, THAIO_NOOP, }; static void AIOContext_dealloc(AIOContext *self) { if (self->pool != 0) { threadpool_t* pool = self->pool; self->pool = 0; threadpool_destroy(pool, 0); } Py_TYPE(self)->tp_free((PyObject *) self); } /* AIOContext.__new__ classmethod definition */ static PyObject * AIOContext_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { AIOContext *self; self = (AIOContext *) type->tp_alloc(type, 0); return (PyObject *) self; } static int AIOContext_init(AIOContext *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"max_requests", "pool_size", NULL}; self->pool = NULL; self->max_requests = 0; if (!PyArg_ParseTupleAndKeywords( args, kwds, "|HH", kwlist, &self->max_requests, &self->pool_size )) return -1; if (self->max_requests <= 0) { self->max_requests = CTX_MAX_REQUESTS_DEFAULT; } if (self->pool_size <= 0) { self->pool_size = CTX_POOL_SIZE_DEFAULT; } if (self->pool_size > MAX_THREADS) { PyErr_Format( PyExc_ValueError, "pool_size too large. Allowed lower then %d", MAX_THREADS ); return -1; } if (self->max_requests >= (MAX_QUEUE - 1)) { PyErr_Format( PyExc_ValueError, "max_requests too large. Allowed lower then %d", MAX_QUEUE - 1 ); return -1; } self->pool = threadpool_create(self->pool_size, self->max_requests, 0); if (self->pool == NULL) { PyErr_Format( PyExc_RuntimeError, "Pool initialization failed size=%d max_requests=%d", self->pool_size, self->max_requests ); return -1; } return 0; } static PyObject* AIOContext_repr(AIOContext *self) { if (self->pool == NULL) { PyErr_SetString(PyExc_RuntimeError, "Pool not initialized"); return NULL; } return PyUnicode_FromFormat( "<%s as %p: max_requests=%i, pool_size=%i, ctx=%lli>", Py_TYPE(self)->tp_name, self, self->max_requests, self->pool_size, self->pool ); } void worker(void *arg) { PyGILState_STATE state; AIOOperation* op = arg; PyObject* ctx = op->ctx; op->ctx = NULL; op->error = 0; if (op->opcode == THAIO_NOOP) { state = PyGILState_Ensure(); op->ctx = NULL; Py_DECREF(ctx); Py_DECREF(op); PyGILState_Release(state); return; } int fileno = op->fileno; off_t offset = op->offset; int buf_size = op->buf_size; char* buf = op->buf; int result; switch (op->opcode) { case THAIO_WRITE: result = pwrite(fileno, (const char*) buf, buf_size, offset); break; case THAIO_FSYNC: result = fsync(fileno); break; case THAIO_FDSYNC: #ifdef HAVE_FDATASYNC result = fdatasync(fileno); #else result = fsync(fileno); #endif break; case THAIO_READ: result = pread(fileno, buf, buf_size, offset); break; } op->ctx = NULL; op->result = result; if (result < 0) op->error = errno; if (op->opcode == THAIO_READ) { op->buf_size = result; } state = PyGILState_Ensure(); if (op->callback != NULL) { PyObject_CallFunction(op->callback, "i", result); } if (op->opcode == THAIO_WRITE) { Py_DECREF(op->py_buffer); op->py_buffer = NULL; } Py_DECREF(ctx); Py_DECREF(op); PyGILState_Release(state); } inline int process_pool_error(int code) { switch (code) { case threadpool_invalid: PyErr_SetString( PyExc_RuntimeError, "Thread pool pointer is invalid" ); return code; case threadpool_lock_failure: PyErr_SetString( PyExc_RuntimeError, "Failed to lock thread pool" ); return code; case threadpool_queue_full: PyErr_Format( PyExc_RuntimeError, "Thread pool queue full" ); return code; case threadpool_shutdown: PyErr_SetString( PyExc_RuntimeError, "Thread pool is shutdown" ); return code; case threadpool_thread_failure: PyErr_SetString( PyExc_RuntimeError, "Thread failure" ); return code; } if (code < 0) PyErr_SetString(PyExc_RuntimeError, "Unknown error"); return code; } PyDoc_STRVAR(AIOContext_submit_docstring, "Accepts multiple Operations. Returns \n\n" " Operation.submit(aio_op1, aio_op2, aio_opN, ...) -> int" ); static PyObject* AIOContext_submit( AIOContext *self, PyObject *args ) { if (self == NULL) { PyErr_SetString(PyExc_RuntimeError, "self is NULL"); return NULL; } if (self->pool == NULL) { PyErr_SetString(PyExc_RuntimeError, "self->pool is NULL"); return NULL; } if (!PyTuple_Check(args)) { PyErr_SetNone(PyExc_ValueError); return NULL; } unsigned int nr = PyTuple_Size(args); PyObject* obj; AIOOperation* ops[nr]; unsigned int i; for (i=0; i < nr; i++) { obj = PyTuple_GetItem(args, i); if (PyObject_TypeCheck(obj, &AIOOperationType) == 0) { PyErr_Format( PyExc_TypeError, "Wrong type for argument %d", i ); return NULL; } ops[i] = (AIOOperation*) obj; ops[i]->ctx = (void*) self; } unsigned int j=0; int result = 0; for (i=0; i < nr; i++) { if (ops[i]->in_progress) continue; ops[i]->in_progress = 1; Py_INCREF(ops[i]); Py_INCREF(ops[i]->ctx); result = threadpool_add(self->pool, worker, (void*) ops[i], 0); if (process_pool_error(result) < 0) return NULL; j++; } return (PyObject*) PyLong_FromSsize_t(j); } PyDoc_STRVAR(AIOContext_cancel_docstring, "Cancels multiple Operations. Returns \n\n" " Operation.cancel(aio_op1, aio_op2, aio_opN, ...) -> int\n\n" "(Always returns zero, this method exists for compatibility reasons)" ); static PyObject* AIOContext_cancel( AIOContext *self, PyObject *args ) { return (PyObject*) PyLong_FromSsize_t(0); } /* AIOContext properties */ static PyMemberDef AIOContext_members[] = { { "pool_size", T_INT, offsetof(AIOContext, pool_size), READONLY, "pool_size" }, { "max_requests", T_USHORT, offsetof(AIOContext, max_requests), READONLY, "max requests" }, {NULL} /* Sentinel */ }; static PyMethodDef AIOContext_methods[] = { { "submit", (PyCFunction) AIOContext_submit, METH_VARARGS, AIOContext_submit_docstring }, { "cancel", (PyCFunction) AIOContext_cancel, METH_VARARGS, AIOContext_cancel_docstring }, {NULL} /* Sentinel */ }; static PyTypeObject AIOContextType = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "Context", .tp_doc = "thread aio context", .tp_basicsize = sizeof(AIOContext), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_new = AIOContext_new, .tp_init = (initproc) AIOContext_init, .tp_dealloc = (destructor) AIOContext_dealloc, .tp_members = AIOContext_members, .tp_methods = AIOContext_methods, .tp_repr = (reprfunc) AIOContext_repr }; static void AIOOperation_dealloc(AIOOperation *self) { Py_CLEAR(self->callback); if ((self->opcode == THAIO_READ) && self->buf != NULL) { PyMem_Free(self->buf); self->buf = NULL; } Py_CLEAR(self->py_buffer); Py_TYPE(self)->tp_free((PyObject *) self); } static PyObject* AIOOperation_repr(AIOOperation *self) { char* mode; switch (self->opcode) { case THAIO_READ: mode = "read"; break; case THAIO_WRITE: mode = "write"; break; case THAIO_FSYNC: mode = "fsync"; break; case THAIO_FDSYNC: mode = "fdsync"; break; default: mode = "noop"; break; } return PyUnicode_FromFormat( "<%s at %p: mode=\"%s\", fd=%i, offset=%i, result=%i, buffer=%p>", Py_TYPE(self)->tp_name, self, mode, self->fileno, self->offset, self->result, self->buf ); } /* AIOOperation.read classmethod definition */ PyDoc_STRVAR(AIOOperation_read_docstring, "Creates a new instance of Operation on read mode.\n\n" " Operation.read(\n" " nbytes: int,\n" " aio_context: Context,\n" " fd: int, \n" " offset: int,\n" " priority=0\n" " )" ); static PyObject* AIOOperation_read( PyTypeObject *type, PyObject *args, PyObject *kwds ) { AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0); static char *kwlist[] = {"nbytes", "fd", "offset", "priority", NULL}; if (self == NULL) { PyErr_SetString(PyExc_MemoryError, "can not allocate memory"); return NULL; } self->buf = NULL; self->py_buffer = NULL; self->in_progress = 0; uint64_t nbytes = 0; uint16_t priority; int argIsOk = PyArg_ParseTupleAndKeywords( args, kwds, "KI|LH", kwlist, &nbytes, &(self->fileno), &(self->offset), &priority ); if (!argIsOk) return NULL; self->buf = PyMem_Calloc(nbytes, sizeof(char)); self->buf_size = nbytes; self->py_buffer = PyMemoryView_FromMemory( self->buf, self->buf_size, PyBUF_READ ); self->opcode = THAIO_READ; return (PyObject*) self; } /* AIOOperation.write classmethod definition */ PyDoc_STRVAR(AIOOperation_write_docstring, "Creates a new instance of Operation on write mode.\n\n" " Operation.write(\n" " payload_bytes: bytes,\n" " fd: int, \n" " offset: int,\n" " priority=0\n" " )" ); static PyObject* AIOOperation_write( PyTypeObject *type, PyObject *args, PyObject *kwds ) { AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0); static char *kwlist[] = {"payload_bytes", "fd", "offset", "priority", NULL}; if (self == NULL) { PyErr_SetString(PyExc_MemoryError, "can not allocate memory"); return NULL; } // unused uint16_t priority; self->buf = NULL; self->py_buffer = NULL; self->in_progress = 0; int argIsOk = PyArg_ParseTupleAndKeywords( args, kwds, "OI|LH", kwlist, &(self->py_buffer), &(self->fileno), &(self->offset), &priority ); if (!argIsOk) return NULL; if (!PyBytes_Check(self->py_buffer)) { Py_XDECREF(self); PyErr_SetString( PyExc_ValueError, "payload_bytes argument must be bytes" ); return NULL; } self->opcode = THAIO_WRITE; if (PyBytes_AsStringAndSize( self->py_buffer, &self->buf, &self->buf_size )) { Py_XDECREF(self); PyErr_SetString( PyExc_RuntimeError, "Can not convert bytes to c string" ); return NULL; } Py_INCREF(self->py_buffer); return (PyObject*) self; } /* AIOOperation.fsync classmethod definition */ PyDoc_STRVAR(AIOOperation_fsync_docstring, "Creates a new instance of Operation on fsync mode.\n\n" " Operation.fsync(\n" " aio_context: AIOContext,\n" " fd: int, \n" " priority=0\n" " )" ); static PyObject* AIOOperation_fsync( PyTypeObject *type, PyObject *args, PyObject *kwds ) { AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0); static char *kwlist[] = {"fd", "priority", NULL}; if (self == NULL) { PyErr_SetString(PyExc_MemoryError, "can not allocate memory"); return NULL; } uint16_t priority; self->buf = NULL; self->py_buffer = NULL; self->in_progress = 0; int argIsOk = PyArg_ParseTupleAndKeywords( args, kwds, "I|H", kwlist, &(self->fileno), &priority ); if (!argIsOk) return NULL; self->opcode = THAIO_FSYNC; return (PyObject*) self; } /* AIOOperation.fdsync classmethod definition */ PyDoc_STRVAR(AIOOperation_fdsync_docstring, "Creates a new instance of Operation on fdsync mode.\n\n" " Operation.fdsync(\n" " aio_context: AIOContext,\n" " fd: int, \n" " priority=0\n" " )" ); static PyObject* AIOOperation_fdsync( PyTypeObject *type, PyObject *args, PyObject *kwds ) { AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0); static char *kwlist[] = {"fd", "priority", NULL}; if (self == NULL) { PyErr_SetString(PyExc_MemoryError, "can not allocate memory"); return NULL; } self->buf = NULL; self->py_buffer = NULL; self->in_progress = 0; uint16_t priority; int argIsOk = PyArg_ParseTupleAndKeywords( args, kwds, "I|H", kwlist, &(self->fileno), &priority ); if (!argIsOk) return NULL; self->opcode = THAIO_FDSYNC; return (PyObject*) self; } /* AIOOperation.get_value method definition */ PyDoc_STRVAR(AIOOperation_get_value_docstring, "Method returns a bytes value of Operation's result or None.\n\n" " Operation.get_value() -> Optional[bytes]" ); static PyObject* AIOOperation_get_value( AIOOperation *self, PyObject *args, PyObject *kwds ) { if (self->error != 0) { PyErr_SetString( PyExc_SystemError, strerror(self->error) ); return NULL; } switch (self->opcode) { case THAIO_READ: return PyBytes_FromStringAndSize( self->buf, self->buf_size ); case THAIO_WRITE: return PyLong_FromSsize_t(self->result); } return Py_None; } /* AIOOperation.get_value method definition */ PyDoc_STRVAR(AIOOperation_set_callback_docstring, "Set callback which will be called after Operation will be finished.\n\n" " Operation.get_value() -> Optional[bytes]" ); static PyObject* AIOOperation_set_callback( AIOOperation *self, PyObject *args, PyObject *kwds ) { static char *kwlist[] = {"callback", NULL}; PyObject* callback; int argIsOk = PyArg_ParseTupleAndKeywords( args, kwds, "O", kwlist, &callback ); if (!argIsOk) return NULL; if (!PyCallable_Check(callback)) { PyErr_Format( PyExc_ValueError, "object %r is not callable", callback ); return NULL; } Py_INCREF(callback); self->callback = callback; Py_RETURN_TRUE; } /* AIOOperation properties */ static PyMemberDef AIOOperation_members[] = { { "fileno", T_UINT, offsetof(AIOOperation, fileno), READONLY, "file descriptor" }, { "offset", T_ULONGLONG, offsetof(AIOOperation, offset), READONLY, "offset" }, { "payload", T_OBJECT, offsetof(AIOOperation, py_buffer), READONLY, "payload" }, { "nbytes", T_ULONGLONG, offsetof(AIOOperation, buf_size), READONLY, "nbytes" }, { "result", T_INT, offsetof(AIOOperation, result), READONLY, "result" }, { "error", T_INT, offsetof(AIOOperation, error), READONLY, "error" }, {NULL} /* Sentinel */ }; /* AIOOperation methods */ static PyMethodDef AIOOperation_methods[] = { { "read", (PyCFunction) AIOOperation_read, METH_CLASS | METH_VARARGS | METH_KEYWORDS, AIOOperation_read_docstring }, { "write", (PyCFunction) AIOOperation_write, METH_CLASS | METH_VARARGS | METH_KEYWORDS, AIOOperation_write_docstring }, { "fsync", (PyCFunction) AIOOperation_fsync, METH_CLASS | METH_VARARGS | METH_KEYWORDS, AIOOperation_fsync_docstring }, { "fdsync", (PyCFunction) AIOOperation_fdsync, METH_CLASS | METH_VARARGS | METH_KEYWORDS, AIOOperation_fdsync_docstring }, { "get_value", (PyCFunction) AIOOperation_get_value, METH_NOARGS, AIOOperation_get_value_docstring }, { "set_callback", (PyCFunction) AIOOperation_set_callback, METH_VARARGS | METH_KEYWORDS, AIOOperation_set_callback_docstring }, {NULL} /* Sentinel */ }; /* AIOOperation class */ static PyTypeObject AIOOperationType = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "aio.AIOOperation", .tp_doc = "thread aio operation representation", .tp_basicsize = sizeof(AIOOperation), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_dealloc = (destructor) AIOOperation_dealloc, .tp_members = AIOOperation_members, .tp_methods = AIOOperation_methods, .tp_repr = (reprfunc) AIOOperation_repr }; static PyModuleDef thread_aio_module = { PyModuleDef_HEAD_INIT, .m_name = "thread_aio", .m_doc = "Thread based AIO.", .m_size = -1, }; PyMODINIT_FUNC PyInit_thread_aio(void) { Py_Initialize(); PyObject *m; m = PyModule_Create(&thread_aio_module); if (m == NULL) return NULL; if (PyType_Ready(&AIOContextType) < 0) return NULL; Py_INCREF(&AIOContextType); if (PyModule_AddObject(m, "Context", (PyObject *) &AIOContextType) < 0) { Py_XDECREF(&AIOContextType); Py_XDECREF(m); return NULL; } if (PyType_Ready(&AIOOperationType) < 0) return NULL; Py_INCREF(&AIOOperationType); if (PyModule_AddObject(m, "Operation", (PyObject *) &AIOOperationType) < 0) { Py_XDECREF(&AIOOperationType); Py_XDECREF(m); return NULL; } return m; }