I've been somewhat enamored with CouchDB lately. When I began, I had a concerns about the speed. I had written similar stores based on XML and while they had their advantages, speed was never one of them. My understanding is that CouchDB manages to stay relatively speedy by using a BTree for storing its views. I looked around for a BTree implementation in Python and found that bsddb has built in BTree storage option. It seemed like it would be trivial to make a simple index using bsddb for persistence and caching views, and in fact, it was pretty easy.
import bsddb
from collections import defaultdict
from itertools import ifilter, imap
from UserDict import UserDict
from multiprocessing import Pool
class RawIndex(UserDict):
def __init__(self, fn):
self.fn = fn
self._open()
self._cache = set()
def _open(self):
self.data = bsddb.btopen(self.fn)
def sync(self):
self.data.sync()
self.data.close()
self._open()
class MappedIndex(RawIndex):
def __init__(self, fn, map_func=None):
RawIndex.__init__(self, fn)
self._cache = {}
self._map = map_func
def add_map(self, map_func):
self._map = map_func
def _apply(self, func, args, itr):
def wrapper(v):
return func(*(v + args))
return ifilter(None, imap(wrapper, itr))
def build_cache(self):
for k in self._cache.iterkeys():
self.search(k)
def search(self, arg_tuple=None, reduce=None):
arg_tuple = arg_tuple or tuple()
if not arg_tuple in self._cache:
self._cache[arg_tuple] = self._apply(
self._map, arg_tuple, self.data.iteritems()
)
if reduce:
return self.apply(reduce, [], self._cache[arg_tuple])
return self._cache[arg_tuple]
class ProcessFunction(object):
def __init__(self, func, args):
self.f = func
self.args = args
def __call__(self, value):
return self.f(*(value + self.args))
class ProcMappedIndex(MappedIndex):
def _apply(self, func, args, itr):
p = Pool()
f = ProcessFunction(func, args)
return ifilter(None, p.map(f, itr))
There are three different types of indexes here. The RawIndex is basically a super thin wrapper around a typical bsddb object. The MappedIndex allows searching the keys using a MapReduce-ish pattern. The idea is you pass in a callable that takes the key and value along with any other arguments you want to pass and returns an iterator. The ProcMappedIndex allows you to run the map and reduce functions in parallel using the multiprocessing module.
I experimented with using another bsddb for the in memory cache, but that didn't allow me to store the iterators or the argument tuple as easily. I could pickle them or something similar, but it seemed pointless since the searches seem like they'd never really hit the performance limits of a Python dictionary (which is fast). That said, I think my next toy might be a memcache type of application that uses an in memory bsddb for keeping key/value pairs.
I do want to point out that the multiprocessing module is really pretty awesome. I know there are some limitations on windows and it is a relatively new module (bugs?), but the fact it made things like performing map/imap operations on an iterator via its built in Pool is simply awesome. One thing though is that when I did try to use the imap function it would hang. I think there must have been a deadlock or something that I didnt' understand, so if there was an obvious reason please let me know.