Mongoengine: Aggregation framework - Brainstorm

Created on 4 Jul 2014  Â·  10Comments  Â·  Source: MongoEngine/mongoengine

My initial idea for Agreggration Framework in MongoEngine.

See the mockup:

from mongoengine import fields, Document, EmbeddedDocument, aggregation


class Tag(EmbeddedDocument):
    name = fields.StringField()


class Animal(Document):
    name = fields.StringField()
    size = fields.IntField()
    tags = fields.ListField(fields.EmbeddedDocumentField(Tag))


# Agreggation pipelines:
# aggregationation.Limit
# agreggation.Project
# aggregation.Skip
# aggregation.Unwind
# aggregation.Group
# aggregation.Sort
# aggregation.Near
# aggregation.Out
#
# Hard pipelines to implement
# agreggation.Redact

# Agreggation operators:
# aggregation.AddToSet
# aggregation.First
# aggregation.Last
# aggregation.Max
# aggregation.Min
# aggregation.Avg
# aggregation.Push
# aggregation.Sum
# Other operators:
# http://docs.mongodb.org/manual/meta/aggregation-quick-reference/


results = Animal.objects.aggregate(
    aggregation.Limit(5),
    aggregation.Skip(2),
    aggregation.Project('name', 'size'),
    aggregation.Unwind('tag'),
    aggregation.Group(
        "tag",  # id field
        totalSize=aggregation.Sum('size'),

    )
)
Aggregation Framework Discussion

Most helpful comment

May be it would better to leave aggregation as low-level method (because it already has rather good interface) — just proxy call to http://api.mongodb.org/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate with some exceptions:

  • if current queryset is already filtered, automatically prepend $match operator to the aggregation pipeline
  • $limit and $skip aggr. operators can be integrated with current BaseQuerySet.__getitem__ implementation as two optional operators appended to the pipeline
  • the same with $sort and BaseQuerySet.order_by

We use mongoengine in one of our projects but due to the lack of aggregation framework support we have to do something like this:

    db = getattr(mongo, settings.MONGODB_DEFAULT_DATABASE)
    cursor = db.our_collection.aggregate([
        {
            '$match': qs._query,
        },
        {
            '$group': {
                '_id': fields,
                'count': {'$sum': 1},
                'avg_price': {'$avg': '$data.price'},
                # ...
            }
        },
        {
            '$sort': {'_id': 1}
        }
    ])
    groups = cursor['result']

Here mongo is an instance of MongoClient and qs is an instance of mongoengine queryset.

All 10 comments

May be it would better to leave aggregation as low-level method (because it already has rather good interface) — just proxy call to http://api.mongodb.org/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate with some exceptions:

  • if current queryset is already filtered, automatically prepend $match operator to the aggregation pipeline
  • $limit and $skip aggr. operators can be integrated with current BaseQuerySet.__getitem__ implementation as two optional operators appended to the pipeline
  • the same with $sort and BaseQuerySet.order_by

We use mongoengine in one of our projects but due to the lack of aggregation framework support we have to do something like this:

    db = getattr(mongo, settings.MONGODB_DEFAULT_DATABASE)
    cursor = db.our_collection.aggregate([
        {
            '$match': qs._query,
        },
        {
            '$group': {
                '_id': fields,
                'count': {'$sum': 1},
                'avg_price': {'$avg': '$data.price'},
                # ...
            }
        },
        {
            '$sort': {'_id': 1}
        }
    ])
    groups = cursor['result']

Here mongo is an instance of MongoClient and qs is an instance of mongoengine queryset.

+1

But in that case we are not able to work with Collection objects.
It would be nice to have something to do that.
In any case we can use raw queries.

+1

I find that I'm having write lengthy queries and then cast the results to Objects, which is not the cleanest code.

@rlkelly do you have an example of what you're doing right now? What results do you get an what type of objects would you expect it to return? The more use cases we gather here, the more insightful the decision we can make.

This would be an example of a lengthy aggregation:

    feed = list(Media.objects(owner__in=Follower.objects(follower=g.user, deleted__ne=True).values_list('followee')).aggregate(
        {'$sort': {'modified_date': 1, 'owner': 1}},
        {'$group': {'_id': '$owner',
            'modified': {'$last': {'$dateToString': {'format': '%m/%d %H:%M', 'date': '$modified_date'}}},
            'new_media_count': {'$sum': 1},
            'last_moment_title': {'$last': '$title'},
            'preview_url': {'$last': '$url'},
            'id': {'$last': '$_id'}}
        }
    ))

@rlkelly could you also include an example of a result of such aggregation (obviously removing/obfuscating any sensitive data)?

So if I had a bunch of users that created media, and I wanted to select all media from my followers and then group by user. So I'd have

Media.objects(owner__in=my_followers_list)

but then I'd want to aggregate this like:

[{'user1': [{'media1': {____}, 'media2':{____}, etc.}], 'user2' : [{'media1': {____}, 'media2': {____}, etc.}]

it's very easy to do with mongodb syntax, but keeping things in line with a python application, readability is compromised and I start having to use the query to build the schema of the response. I think it'd be much cleaner if I could define the output schema as a method on the Document object and build an aggregation query in pythonic terms.

Hopefully that's more clear! Thanks for the response.

I faced the same issue in a project so I decided to implement a module that extends the aggregation functionality of mongoengine. You can find more information at my repo: https://github.com/MosesSymeonidis/aggregation_builder and as an example the initial example of this post could be implemented like this:

from mongoengine import fields, Document, EmbeddedDocument, aggregation
from aggregation_builder import AggregateQuerySet
from aggregation_builder.operators import SUM

class Tag(EmbeddedDocument):
    name = fields.StringField()


class Animal(Document):

    meta = {'queryset_class': AggregateQuerySet}

    name = fields.StringField()
    size = fields.IntField()
    tags = fields.ListField(fields.EmbeddedDocumentField(Tag))

results = Animal.objects.aggregation_builder.limit(5).skip(2).project('name','size').unwind('$tags').group('$tags',totalSize=SUM('$size'))

Was this page helpful?
0 / 5 - 0 ratings