In some workloads with highly compressible data we would like to trade off some computation time for more in-memory storage automatically. Dask workers store data in a MutableMapping (the superclass of dict). So in principle all we would need to do is make a MutableMapping subclass that overrides the getitem and setitem methods to compress and decompress data on demand.
This would be an interesting task for someone who wants to help Dask, wants to learn some internals, but doesn't know a lot just yet. I'm marking this as a good first issue. This is an interesting and useful task that doesn't require deep incidental Dask knowledge.
Here is a conceptual prototype of such a MutableMapping. This is completely untested, but maybe gives a sense of how I think about this problem. It's probably not ideal though so I would encourage others to come up with their own design.
import collections
from typing import Dict, Tuple, Callable
class TypeCompression(collections.MutableMapping):
def __init__(
self,
types: Dict[type, Tuple[Callable, Callable]],
storage=dict
):
self.types = type
self.storage = collections.defaultdict(storage)
def __setitem__(self, key, value):
typ = type(key)
if typ in self.types:
compress, decompress = self.types[typ]
value = compress(value)
self.storage[typ] = value
def __getitem__(self, key):
for typ, d in self.storage.items():
if key in d:
value = d[key]
break
else:
raise KeyError(key)
if typ in self.types:
compress, decompress = self.types[typ]
value = decompress(value)
return value
This came up in https://github.com/dask/distributed/pull/3624 . cc @madsbk and @jakirkham from that PR. cc also @eric-czech who was maybe curious about automatic compression/decompression.
People looking at compression might want to look at and use Dask's serializations and comrpession machinery in distributed.protocol (maybe start by looking at the dumps, serialize and maybe_compress functions).
Also cc @prasunanand and @andersy005 who have both asked about good first issues in the past. I think that this would be fun.
@mrocklin I would love to work in it. :)
Great!
On Sat, Mar 28, 2020, 11:31 AM Prasun Anand notifications@github.com
wrote:
@mrocklin https://github.com/mrocklin I would love to work in it. :)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3656#issuecomment-605500721,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTFWQHQ7XEOXVJR3ITLRJY67JANCNFSM4LVUKVEA
.
Hi, I need a little help.
Do I need to modify the logic in dumps, loads ( link ) ?
Does types in TypeCompress refer to int, double, etc. or to snappy, blosc, lz4 etc. ? If it refers to int, double, etc where are the corresponding compressors and decompressors ?
cc @jrbourbeau
Does types in TypeCompress refer to int, double, etc. or to snappy, blosc, lz4 etc. ?
You don't have to use the structure I started with. I encourage you to think about this on your own and how you would design it. If you blindly follow my design you probably won't develop a high level understanding of the problem. What I put up there was just an idea, but not a fully formed one, whoever solves this task will need to think a lot more about the problem than what I did.
To add some more context here, this is an object that would replace the MutableMapping currently used in Worker.data. It would expect to receive any user generated Python object as a value. We would want to take those values, serialize, and maybe compress them when we put them into the underlying mapping.
So for example we would want something like the following to work:
x = np.ones(1000000) # a large but compressible piece of data
d = MyMapping()
d["x"] = x # put Python object into d
out = d["x"] # get the object back out
assert str(out) == str(x) # the two objects should match
# assuming here that the underlying bytes are stored in something like a `.storage` attribute, but this isn't required
# we check that the amount of actual data stored is small
assert sum(map(len, d.storage.values())) < x.nbytes
In Dask one would test this by putting it into a Worker
@pytest.mark.asyncio
async def test_compression():
async with Scheduler() as s:
async with Worker(s.address, data=MyMapping):
async with Client(s.address, asynchronous=True) as c:
x = da.ones((10000, 10000))
y = await x.persist() # put data in memory
y = await (x + x.T).mean().persist() # do some work
assert sum(map(len, worker.data.storage.values())) < x.nbytes
(None of the code here was tested, and may have bugs. I wouldn't trust it too much)
The mutable mapping looks like something which would be well suited for https://github.com/dask/zict The integration to distributed would then look similar to how spill-to-disk is implemented at the moment
PR ( https://github.com/dask/distributed/pull/3702 ) seems to be going in the right direction. Probably the best place to move this forward atm.
Most helpful comment
You don't have to use the structure I started with. I encourage you to think about this on your own and how you would design it. If you blindly follow my design you probably won't develop a high level understanding of the problem. What I put up there was just an idea, but not a fully formed one, whoever solves this task will need to think a lot more about the problem than what I did.