Graphene: Is there a good way to ask if a quey contains a certain field?

Created on 3 Nov 2016  路  12Comments  路  Source: graphql-python/graphene

for example:

query {
   viewer {
       id
       friends
    }
}

I want resolve_viewer to be able to ask if we're querying for friends under viewer so it can do some prefetching...
Also needs to support fragments because the query above could be written as:

query {
   viewer {
       ...F1
    }
}
fragment F1 on ViewerType {
   id
   friends
}

Most helpful comment

Ended up with the following introspection code (based on the code above, but allows looking several levels deep into the query):

def get_field_names(info):
    """
    Parses a query info into a list of composite field names.
    For example the following query:
        {
          carts {
            edges {
              node {
                id
                name
                ...cartInfo
              }
            }
          }
        }
        fragment cartInfo on CartType { whatever }

    Will result in an array:
        [
            'carts',
            'carts.edges',
            'carts.edges.node',
            'carts.edges.node.id',
            'carts.edges.node.name',
            'carts.edges.node.whatever'
        ]
    """
    from graphql.language.ast import FragmentSpread
    fragments = info.fragments

    def iterate_field_names(prefix, field):
        name = field.name.value

        if isinstance(field, FragmentSpread):
            results = []
            new_prefix = prefix
            sub_selection = fragments[field.name.value].selection_set.selections
        else:
            results = [prefix + name]
            new_prefix = prefix + name + "."
            sub_selection = field.selection_set.selections if field.selection_set else []

        for sub_field in sub_selection:
            results += iterate_field_names(new_prefix, sub_field)

        return results

    results = iterate_field_names('', info.field_asts[0])
    return results

All 12 comments

I am doing something similar in a related project. I think it's something that's probably better handled outside the resolve function. You do get the info argument passed to resolve_viewer, but to my knowledge, it's not easy to parse and would be way too fragile as it may change depending on the query.

A better approach could be to return a lazy loading object that resolves the viewer from DB when a non-id field is accessed. You would return ViewerProxy(user_id, UserClass) instead of the real User. When the underlying graphene code tries to get friends from your user object, the "prefetch" would trigger.

from wrapt import ObjectProxy

class ViewerProxy(ObjectProxy):
    def __init__(self, id, obj):
        super(ViewerProxy, self).__init__(obj)
        self._id = id
        self._self_model = obj

    def __getattr__(self, name):
        if name == "id":
            return self._id
        if not name.startswith('_') and isinstance(self.__wrapped__, type):
            self.__wrapped__ = [THIS IS WHERE YOU QUERY THE REAL THING]
        return super(ViewerProxy, self).__getattr__(name)

Another thing that comes to mind: I don't know what you're setup is for authentication, but most likely, the viewer has already been queried as part of your authentication/session logic, so you probably already made a query to the db by the time you hit the resolve_viewer in which case, you'd be returning g.user (for flask) or request.user for django, so I don't know if you'd be gaining anything from this approach.

Hey @yfilali,

Say my query is:

{
   viewer {
      friends {
          firstName
          lastName
      }
   }
}

If I use an object proxy then, given a user, if you query for friends then resolve_friends will only query for the user friendsIDs and return a list of proxy objects. Each proxy object will fetch itself to resolvefirstNameandlastName`.
This will result into N database calls.
I would want to make a one call to fetch all friends...

Not so! Notice that I have a test for that "and isinstance(self.wrapped, type):"

If the wrapped object is a type, we query and convert it to an instance. This makes sure we only query once.

Edit: sorry, I think I misunderstood at first. See how I do it here: RelationshipResultList
https://github.com/yfilali/graphql-pynamodb/blob/master/graphene_pynamodb/relationships.py

once per type... if friends is an array of 10 proxies....

Maybe this describes the problem better:
http://guides.rubyonrails.org/active_record_querying.html#eager-loading-associations

Yes, in your case, the sequence would go something like this:
1) resolve_viewer => return a viewer proxy object
2) resolve_friends is called and passed the previously resolved viewer proxy object as the root (or self) first argument. At this point, no query was made yet. Now you know that friends is accessed and do the optimal query for viewer + friends.

Here is the same thing in code. Does something like this work for you?

    class User(object):
        pass

    class Friend(graphene.ObjectType):
        class Meta:
            interfaces = (graphene.relay.Node,)

        firstName = graphene.String()
        lastName = graphene.String()

    class User(graphene.ObjectType):
        class Meta:
            interfaces = (graphene.relay.Node,)

        firstName = graphene.String()
        lastName = graphene.String()
        friends = graphene.List(Friend)

        def resolve_friends(root, args, context, info):
            print root.id  # == 1

    class Query(graphene.ObjectType):
        viewer = graphene.Field(User, )

        def resolve_viewer(self, args, context, info):
            return ViewerProxy(1, User)

    schema = graphene.Schema(query=Query)

    result = schema.execute('{viewer { friends { firstName, lastName } } }')

I don't think I follow this solution ...
What if your query is:

{
  viewer { 
    firsName
    lastName
    friends { firstName, lastName } 
  } 
}

The __getattr__ method will be called for firstName - how do I know there I need to fetch the user from DB + its friends ?

@ekampf you can do something like the following (extracted from a PR that optimizes queries in graphene-django).

def get_type(_type):
    if isinstance(_type, (GraphQLList, GraphQLNonNull)):
        return get_type(_type.of_type)
    return _type

def get_fields(info):
    field_asts = info.field_asts[0].selection_set.selections
    _type = get_type(info.return_type)

    for field_ast in field_asts:
        field_name = field_ast.name.value
        yield field_name
        # You can also do:
        # field_def = get_field_def(info.schema, _type, field_ast)
        # yield field_def.resolver # This will get the field resolver

Wow, that's a much less flawed approach than mine!

@syrusakbary What about spreading fragments like per @ekampf's initial question? something like this?

    def get_type(_type):
        if isinstance(_type, (GraphQLList, GraphQLNonNull)):
            return get_type(_type.of_type)
        return _type

    def get_fields(info):
        fragments = info.fragments
        field_asts = info.field_asts[0].selection_set.selections
        _type = get_type(info.return_type)

        for field_ast in field_asts:
            field_name = field_ast.name.value
            if isinstance(field_ast, FragmentSpread):
                for field in fragments[field_name].selection_set.selections:
                    yield field.name.value
                continue

            yield field_name

@ekampf the solution provided by @yfilali or myself should work for knowing the requested fields.
Feel free to reopen the issue if not! :)

Ended up with the following introspection code (based on the code above, but allows looking several levels deep into the query):

def get_field_names(info):
    """
    Parses a query info into a list of composite field names.
    For example the following query:
        {
          carts {
            edges {
              node {
                id
                name
                ...cartInfo
              }
            }
          }
        }
        fragment cartInfo on CartType { whatever }

    Will result in an array:
        [
            'carts',
            'carts.edges',
            'carts.edges.node',
            'carts.edges.node.id',
            'carts.edges.node.name',
            'carts.edges.node.whatever'
        ]
    """
    from graphql.language.ast import FragmentSpread
    fragments = info.fragments

    def iterate_field_names(prefix, field):
        name = field.name.value

        if isinstance(field, FragmentSpread):
            results = []
            new_prefix = prefix
            sub_selection = fragments[field.name.value].selection_set.selections
        else:
            results = [prefix + name]
            new_prefix = prefix + name + "."
            sub_selection = field.selection_set.selections if field.selection_set else []

        for sub_field in sub_selection:
            results += iterate_field_names(new_prefix, sub_field)

        return results

    results = iterate_field_names('', info.field_asts[0])
    return results

Was this page helpful?
0 / 5 - 0 ratings