Skip to content

Recipe for DynamicSetPartitioner #386

@se7entyse7en

Description

@se7entyse7en

Hi I'm was looking at the SetPartitioner recipe and it's awesome. Anyway in my case the set of items to partition is not fixed, but changes dynamically. What I did by now is to subclass the SetPartitioner by creating a DynamicSetPartitioner class that triggers the allocation also when the items changes. What I did is to add a path containing the items and a watcher for that path. Then I refresh the _set attribute everytime _allocate_transition is triggered. It looks something like this:

import threading

from kazo.recipe import partitioner
from kazoo.client import KazooClient
from kazoo.recipe.watchers import PatientChildrenWatch

class DynamicSetPartitioner(partitioner.SetPartitioner):

    def __init__(self, *args, **kwargs):
        lock_path = kwargs.pop('lock_path', None)
        party_path = kwargs.pop('party_path', None)
        set_path = kwargs.pop('set_path', None)

        self._set_path = set_path or '/'.join([self._path, 'set'])
        client.ensure_path(self._set_path)

        super(DynamicSetPartitioner, self).__init__(
            client, path, set(), **kwargs)

        self._lock_path = lock_path or self._lock_path
        self._party_path = party_path or self._party_path
        self._allocation_lock = threading.Lock()

    def _child_watching(self, func=None, async=False):
        party_watcher = PatientChildrenWatch(self._client, self._party_path,
                                             self._time_boundary)

        set_watcher = PatientChildrenWatch(self._client, self._set_path,
                                           self._time_boundary)

        watchers = [party_watcher, set_watcher]

        for watcher in watchers:
            asy = watcher.start()
            if func is not None:
                if async:
                    func = partial(self._client.handler.spawn, func)
                asy.rawlink(func)

    def _allocate_transition(self, result):
        with self._allocation_lock:
            self._set = self._client.get_children(self._set_path)
            return super(DynamicSetPartitioner, self)._allocate_transition(result)

If you think that it could be usefull I'd like to contribute with this.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions