sqlalchemy - limit the joinedloaded results

  • Last Update :
  • Techknowledgy :

I think what you want can be achieved by using a DISTINCT ON clause to remove the duplicate rows retrieved from the Goal object:

query = (
   # Select from Goal and join all the required tables select(Goal)
   .join(Goal.position)
   .join(Position.player)
   .join(Player.team) # Remove duplicate rows based on the Player id
   .distinct(Player.id) # Order by `Player.id`(required
      for distinct) and descending on the goal_id to have the latest added goals(newest) first
   .order_by(Player.id, Goal.id.desc())
)

When using the sample date below this results in:

{
   "id": 3,
   "players": [{
         "id": 5,
         "positions": [{
            "id": 7,
            "goals": [{
               "id": 13,
            }]
         }]
      },
      {
         "id": 1,
         "positions": [{
            "id": 1,
            "goals": [{
               "id": 16,
            }]
         }]
      }
   ]
}

You can also turn the query with the DISTINCT ON clause around which results in:

query = (
   # Select all the required tables select(Team, Player, Position, Goal) # outerjoin all required tables resulting in a `LEFT OUTER JOIN`
   .outerjoin(Team.players)
   .outerjoin(Player.positions)
   .outerjoin(Position.goals) # Remove duplicate rows based on the Player id
   .distinct(Player.id) # Order by `Player.id`(required
      for distinct) and descending on the goal_id to have the latest added goals(newest) first
   .order_by(Player.id, Goal.id.desc())
)

Sample data

{
   "id": 3,
   "players": [{
         "id": 3,
         "positions": []
      },
      {
         "id": 5,
         "positions": [{
            "id": 7,
            "goals": [{
               "id": 13,
            }]
         }]
      },
      {
         "id": 1,
         "positions": [{
               "id": 1,
               "goals": [{
                     "id": 16,
                  },
                  {
                     "id": 15,
                  },
                  {
                     "id": 14,
                  }
               ]
            },
            {
               "id": 2,
               "goals": [{
                  "id": 4,
               }]
            }
         ]
      }
   ]
}

First, I believe that the line below should not be part of the query as it will create a cartesian product. Look for sqlalchemy warnings when executing the query:

.select_from(Player, Position, Goal) # DELETE this as it creates cartesian product

Second, you can simplify your original query somewhat. Below produces a query equivalent to the one in your question:

# Query to get all goals of all players of a team
query1 = (
   select(Team) #.select_from(Player, Position, Goal) # DELETE this as it creates cartesian product
   .options(
      joinedload(Team.players)
      .joinedload(Player.positions)
      .joinedload(Position.goals)
   )
)

Above query can also be implemented differently by a) joining the related tables explicitly, and b) hinting to sqlalchemy that the query already contains the desired relationships:

query2 = (
   select(Team)
   .outerjoin(Team.players)
   .outerjoin(Player.positions)
   .outerjoin(Position.goals)
   .options(contains_eager(
      Team.players,
      Player.positions,
      Position.goals,
   ))
)

which produces the following SQL (sqlite):

SELECT goal.id,
   goal.distance,
   goal.position_id,
   position.id AS id_1,
   position.name,
   position.player_id,
   player.id AS id_2,
   player.name AS name_1,
   player.team_id,
   team.id AS id_3,
   team.name AS name_2
FROM team
LEFT OUTER JOIN player ON team.id = player.team_id
LEFT OUTER JOIN position ON player.id = position.player_id
LEFT OUTER JOIN goal ON goal.id =
   (SELECT goal.id AS last_goal_id FROM goal WHERE goal.position_id = position.id ORDER BY goal.id DESC LIMIT 1)

What you could also do it to create a hybrid_property to have computed column which points to the last Goal.id per Position and use it to define a relationship which will only contain last Goal in the list:

class Position(Base):
   __tablename__ = "position"
id = Column(Integer(), primary_key = True)
name = Column(String(255), unique = True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref = "position")

@hybrid_property
def last_goal_id(self):
   ...

   @last_goal_id.expression
def last_goal_id(cls):
   stmt = (
      select(Goal.id.label("last_goal_id")) #.filter(Goal.position_id == Position.id)
      .filter(Goal.position_id == cls.id)
      .order_by(Goal.id.desc())
      .limit(1)
      .scalar_subquery()
      .correlate(cls) #.correlate_except(Goal)
   )
return stmt

last_goals = relationship(
   lambda: Goal,
   primaryjoin = lambda: and_(
      Goal.position_id == Position.id,
      Goal.id == Position.last_goal_id,
   ),
   viewonly = True,
   uselist = True,
)

Something like this:

# Rank goals by id and position
subquery = select(
   Goal.id.label('goal_id'),
   Goal.position_id,
   func.rank().over(order_by = Goal.id.desc(), partition_by(Goal.position_id)).label('rank'),
).subquery()

# Create dict of {
   position_id: latest_goal_id
}
to use as a lookup
latest_goal_query = (
   select(subquery.c.goal_id, subquery.c.position_id)
   .where(subquery.c.rank == 1)
)
latest_goal_ids = {
   pos_id: goal_id
   for goal_id,
   pos_id in session.execute(latest_goals).fetchall()
}

# Get goal objects from the IDs
goal_query = select(Goal).where(Goal.id.in_(latest_goals.values()))
goals = {
   goal.id: goal
   for goal in session.execute(goal_query).scalars()
}

# Map position ID to the latest goal object
latest_goals = {
   pos_id: goals[goal_id]
   for pos_id,
   goal_id in latest_goal_ids.items()
}

# Read the team and position, and you can use the position_id to get the latest goal
query = ...

Edit: column_property just came to mind as an alternative solution. Unfortunately I've never been able to figure how to map the actual Goal model, so this isn't perfect, but here's an example of how you could add the ID of the latest goal directly to the Player model.

class Player(Base):
   ...
   latest_goal_id = column_property(
      select(Goal.id)
      .where(Goal.position.has(Position.player_id == id)),
      .order_by(Goal.id.desc()).limit(1)
   )

Suggestion : 2

May 19, 2022 , Start date May 19, 2022

class Team(Base):
   id = Column(Integer, primary_key = True)
name = Column(String, nullable = False)

players = relationship("Player", backref = "team")

class Player(Base):
   id = Column(Integer, primary_key = True)
name = Column(String(255), unique = True)
team_id = Column(Integer, ForeignKey("team.id"))
positions = relationship("Position", backref = "player")

class Position(Base):
   id = Column(Integer(), primary_key = True)
name = Column(String(255), unique = True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref = "position")

class Goal(Base):
   id = Column(Integer(), primary_key = True)
distance = Column(Integer)
position_id = Column(Integer, ForeignKey("position.id"))
# Query to get all goals of all players of a team
query = (
      select(Team)
      .select_from(Player, Position, Goal)
      .options(joinedload(Team.players))
      .options(
         joinedload(
            Team.players,
            Player.positions,
         )
      )
      .options(
         joinedload(
            Team.players,
            Player.positions,
            Position.goals,
         )
      ) result = await db.execute(query) response = result.scalar()
{
   "id": 3,
   "players": [{
         "id": 3,
         "positions": []
      },
      {
         "id": 5,
         "positions": [{
            "id": 7,
            "goals": [{
               "id": 13,
            }]
         }]
      },
      {
         "id": 1,
         "positions": [{
               "id": 1,
               "goals": [{
                     "id": 16,
                  },
                  {
                     "id": 15,
                  },
                  {
                     "id": 14,
                  }
               ]
            },
            {
               "id": 2,
               "goals": [{
                  "id": 4,
               }]
            }
         ]
      }
   ]
}
subquery = (
   select(Goal)
   .order_by(Goal.id.desc())
   .limit(1)
   .subquery()
   .lateral()
)

query = (
      select(Team)
      .select_from(Player, Position, Goal)
      .options(joinedload(Team.players))
      .options(
         joinedload(
            Team.players,
            Player.positions,
         )
      )
      .outerjoin(subquery)
      .options(
         contains_eager(
            Team.players,
            Player.positions,
            Position.goals,
            alias = subquery,
         )
      ) result = await db.execute(query) response = result.scalar()
{
   "id": 3,
   "players": [{
         "id": 3,
         "positions": []
      },
      {
         "id": 5,
         "positions": [{
            "id": 7,
            "goals": [{
               "id": 16,
            }]
         }]
      },
      {
         "id": 1,
         "positions": [{
               "id": 1,
               "goals": [{
                  "id": 16,
               }]
            },
            {
               "id": 2,
               "goals": [{
                  "id": 16,
               }]
            }
         ]
      }
   ]
}
cat / etc / resolv.conf
nameserver 127.0 .0 .1

Suggestion : 3

How to limit N results per `group_by` in SQLAlchemy/Postgres?,How to get top n results per group from a pool of ids in SQLAlchemy?,How to group results to boolean value in PostgreSQL,How to get mutiple results from group by in postgres

Adopting that to SQLAlchemy is straightforward using function element's over() method to produce a window expression:

medium_contact_id_subq = g.session.query(
   func.unnest(FUContact.medium_contact_id_lis).distinct()).\
filter(FUContact._id.in_(contact_id_lis)).\
subquery()

# Perform required filtering in the subquery.Choose a suitable ordering,
   # or you 'll get indeterminate results.
subq = g.session.query(
   FUMessage,
   func.row_number().over(
      partition_by = FUMessage.fu_medium_contact_id,
      order_by = FUMessage.timestamp_utc).label('n')).\
filter(FUMessage.fu_medium_contact_id.in_(medium_contact_id_subq)).\
subquery()

fumessage_alias = aliased(FUMessage, subq)

# row_number() counts up from 1, so include rows with a row num
# less than or equal to limit
q = g.session.query(fumessage_alias).\
filter(subq.c.n <= MESSAGE_LIMIT)

Suggestion : 4

A query which makes use of subqueryload() in conjunction with a limiting modifier such as Query.first(), Query.limit(), or Query.offset() should always include Query.order_by() against unique column(s) such as the primary key, so that the additional queries emitted by subqueryload() include the same ordering as used by the parent query. Without it, there is a chance that the inner query could return the wrong rows:,The option does not supersede loader options stated in the query, such as eagerload(), subqueryload(), etc. The query below will still use joined loading for the widget relationship:,Each of joinedload(), subqueryload(), lazyload(), and noload() can be used to set the default style of relationship() loading for a particular query, affecting all relationship() -mapped attributes not otherwise specified in the Query. This feature is available by passing the string '*' as the argument to any of these options:,When querying, all three choices of loader strategy are available on a per-query basis, using the joinedload(), subqueryload() and lazyload() query options:

sql>>> jack.addresses
SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address,
addresses.user_id AS addresses_user_id
FROM addresses
WHERE ? = addresses.user_id
[5]
[<Address(u'jack@google.com')>, <Address(u'j25@yahoo.com')>]
sql >>> jack = session.query(User).\
   ...options(joinedload('addresses')).\
   ...filter_by(name = 'jack').all() #doctest: +NORMALIZE_WHITESPACE
SELECT addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address,
   addresses_1.user_id AS addresses_1_user_id, users.id AS users_id, users.name AS users_name,
   users.fullname AS users_fullname, users.password AS users_password
FROM users LEFT OUTER JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id
WHERE users.name = ? ['jack']
sql >>> jack = session.query(User).\
   ...options(subqueryload('addresses')).\
   ...filter_by(name = 'jack').all()
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
   users.password AS users_password
FROM users
WHERE users.name = ?
   ('jack', )
SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address,
   addresses.user_id AS addresses_user_id, anon_1.users_id AS anon_1_users_id
FROM(SELECT users.id AS users_id FROM users WHERE users.name = ? ) AS anon_1 JOIN addresses ON anon_1.users_id = addresses.user_id
ORDER BY anon_1.users_id, addresses.id('jack', )
# load the 'children'
collection using LEFT OUTER JOIN
class Parent(Base):
   __tablename__ = 'parent'

id = Column(Integer, primary_key = True)
children = relationship("Child", lazy = 'joined')
# load the 'children'
collection using a second query which
# JOINS to a subquery of the original
class Parent(Base):
   __tablename__ = 'parent'

id = Column(Integer, primary_key = True)
children = relationship("Child", lazy = 'subquery')
# set children to load lazily
session.query(Parent).options(lazyload('children')).all()

# set children to load eagerly with a join
session.query(Parent).options(joinedload('children')).all()

# set children to load eagerly with a second statement
session.query(Parent).options(subqueryload('children')).all()

Suggestion : 5

March 09, 2020Pauline Huguenel7 min read

relationship(ProjectModel, lazy = "select")
class Parent(Base):
   __tablename__ = 'parent'

id = Column(Integer, primary_key = True)
child_id = Column(Integer, ForeignKey('child.id'))
child = relationship("Child", lazy = "select")

# emits a SELECT statement to get the parent object
parent = session.query(Parent).first()

# emits a second SELECT statement to get the child object
child = parent.child
class Parent(Base):
   __tablename__ = 'parent'

id = Column(Integer, primary_key = True)
child_id = Column(Integer, ForeignKey('child.id'))
child = relationship("Child", lazy = "joined")

# emits a SELECT statement to get the parent object and its children
parent = session.query(Parent).first()

# does not emit a second SELECT statement as child object is already loaded
child = parent.child
class Person(Base):
   __tablename__ = 'person'

id = Column(Integer, primary_key = True)
manager_id = Column(Integer, ForeignKey('person.id'))
manager = relationship("Person", lazy = "joined", join_depth = 1)

person = session.query(Person).first()
person.manager # eager - loaded(one level deep)
person.manager.manager # lazy - loaded(two levels deep)
class Person(Base):
   __tablename__ = 'person'
id = Column(Integer, primary_key = True)
firstname = Column(Text)
lastname = Column(Text)

address_id = Column(Integer, ForeignKey('address.id'))
address = relationship('Address')

manager_id = Column(Integer, ForeignKey(person.id))
manager = relationship('Person')

class Address(Base):
   __tablename__ = 'address'
id = Column(Integer, primary_key = True)
number = Column(Integer)
street = Column(Text)
city = Column(Text)
country = Column(Text)
Person.query.options(joinedload("manager"))

Suggestion : 6

sqlalchemy.orm.relationship()

def find_user(self, case_insensitive = False, ** kwargs):
   from sqlalchemy
import func as alchemyFn

query = self.user_model.query
if config_value("JOIN_USER_ROLES") and hasattr(self.user_model, "roles"):
   from sqlalchemy.orm
import joinedload

query = query.options(joinedload("roles"))

if case_insensitive:
   # While it is of course possible to pass in multiple keys to filter on
# that isn 't the normal use case. If caller asks for case_insensitive
# AND gives multiple keys -
   throw an error.
if len(kwargs) > 1:
   raise ValueError("Case insensitive option only supports single key")
attr, identifier = kwargs.popitem()
subquery = alchemyFn.lower(
   getattr(self.user_model, attr)
) == alchemyFn.lower(identifier)
return query.filter(subquery).first()
else:
   return query.filter_by( ** kwargs).first()
def get_alias_infos_with_pagination(user, page_id = 0, query = None) - > [AliasInfo]:
   ret = []
q = (
   db.session.query(Alias)
   .options(joinedload(Alias.mailbox))
   .filter(Alias.user_id == user.id)
   .order_by(Alias.created_at.desc())
)

if query:
   q = q.filter(
      or_(Alias.email.ilike(f "%{query}%"), Alias.note.ilike(f "%{query}%"))
   )

q = q.limit(PAGE_LIMIT).offset(page_id * PAGE_LIMIT)

for alias in q:
   ret.append(get_alias_info(alias))

return ret
def get_name_for_netloc_db(db_sess, netloc):

   if netloc in FEED_LOOKUP_CACHE:
   return FEED_LOOKUP_CACHE[netloc]

row = db_sess.query(db.RssFeedUrlMapper)\
   .filter(db.RssFeedUrlMapper.feed_netloc == netloc)\
   .options(joinedload('feed_entry'))\
   .all()

if not row:
   return False

if len(row) > 1:
   print("ERROR: Multiple solutions for netloc %s?" % netloc)

feedname = row[0].feed_entry.feed_name
if feedname:
   FEED_LOOKUP_CACHE[netloc] = feedname
return feedname
else:
   return False
def renderFeedsTable(page = 1):

   feeds = g.session.query(db.RssFeedPost)\
   .order_by(desc(db.RssFeedPost.published))

feeds = feeds.options(joinedload('tag_rel'))
feeds = feeds.options(joinedload('author_rel'))

if feeds is None:
   flash('No feeds? Something is /probably/ broken!.')
return redirect(url_for('renderFeedsTable'))

feed_entries = paginate(feeds, page, app.config['FEED_ITEMS_PER_PAGE'])

return render_template('rss-pages/feeds.html',
   subheader = "",
   sequence_item = feed_entries,
   page = page
)
def get_current_game(self, db_session, with_bets = False, with_users = False):
   query = db_session.query(HSBetGame).filter(HSBetGame.is_running)

# with_bets and with_users are just optimizations
for the querying.
# If a code path knows it 's going to need to load the bets and users for each bet,
# we can load them eagerly with a proper SQL JOIN instead of lazily later,
   # to make that code path faster
if with_bets:
   query = query.options(joinedload(HSBetGame.bets))
if with_users:
   query = query.options(joinedload(HSBetGame.bets).joinedload(HSBetBet.user))

current_game = query.one_or_none()
if current_game is None:
   current_game = HSBetGame()
db_session.add(current_game)
db_session.flush() # so we get current_game.id set
return current_game
def with_(cls, schema):
   ""
"
Query class and eager load schema at once.: type schema: dict

Example:
   schema = {
      'user': JOINED,
      # joinedload user 'comments': (SUBQUERY, {
         # load comments in separate query 'user': JOINED # but,
         in this separate query,
         join user
      })
   }
# the same schema using class properties:
   schema = {
      Post.user: JOINED,
      Post.comments: (SUBQUERY, {
         Comment.user: JOINED
      })
   }
User.with_(schema).first()
""
"
return cls.query.options( * eager_expr(schema or {}))