sqlalchemy: add into mysql table new rows from pandas dataframe, if they don't exist already in the table

  • Last Update :
  • Techknowledgy :

You could pull the results already in the database into a new dataframe and then compare the two dataframes. After that you would only insert the rows not in the table. Not knowing the format of your table or data I'm just using a generic SELECT statement here.

from sqlalchemy
import create_engine
from sqlalchemy
import exc
engine = create_engine('mysql://usr:psw@ip/schema')
con = engine.connect()
sql = "SELECT * FROM table_name"
old_results = pd.read_sql(sql, con)
df = pd.merge(old_results, results_final, how = 'outer', indicator = True)
new_results = df[df['_merge'] == 'right_only'][results_final.columns]
new_results.to_sql(name = 'error', con = con, if_exists = 'append')
con.close()

I developed this function to handle both: news values and when columns from the source table and target table are not equal.

def load_data(df):
   engine = create_engine('mysql+pymysql://root:pass@localhost/dw', echo_pool = True, pool_size = 10, max_overflow = 20)
with engine.connect() as conn, conn.begin():
   try:
   df_old = pd.read_sql('SELECT * FROM table', conn)

# Check
if exists new rows to be inserted

if len(df) > len(df_saved) or df.disconnected_time.max() > df_saved.disconnected_time.max():
   print("There are new rows to be inserted. ")

df_merged = pd.merge(df_old, df, how = 'outer', indicator = True)
df_final = df_merged[df_merged['_merge'] == 'right_only'][df.columns]
df_final.to_sql(name = 'table', con = conn, index = False, if_exists = 'append')

except Exception as err:
   print(str(err))

else:
   # This handling errors when the lengths of the columns are not equal to the target
if df_bulbr.shape[1] > df_old.shape[1]:
   data = pd.read_sql('SELECT * FROM table', conn)
df2 = pd.concat([df, data])
df2.to_sql('table', conn, index = False, if_exists = 'replace')

outcome = conn.execute("select count(1) from table")
countRow = outcome.first()[0]

return print(f " Total of {countRow} rows load.")

Suggestion : 2

MySQL provides a number of useful statements when it is necessary to INSERT rows after determining whether that row is, in fact, new or already exists.,Notice that even though we only altered one row, the result indicates that two rows were affected because we actually DELETED the existing row then INSERTED the new row to replace it.,A matching data row is found, causing that existing row to be deleted with the standard DELETE statement, then a normal INSERT is performed afterward.,Below we’ll examine the three different methods and explain the pros and cons of each in turn so you have a firm grasp on how to configure your own statements when providing new or potentially existing data for INSERTION.

mysql > SELECT * FROM books LIMIT 3; +
-- -- + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
|
id | title | author | year_published |
   + -- -- + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
   |
   1 | In Search of Lost Time | Marcel Proust | 1913 |
   |
   2 | Ulysses | James Joyce | 1922 |
   |
   3 | Don Quixote | Miguel de Cervantes | 1605 |
   + -- -- + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
   3 rows in set(0.00 sec)
mysql > INSERT INTO books(id, title, author, year_published)
VALUES
   (1, 'Green Eggs and Ham', 'Dr. Seuss', 1960);
ERROR 1062(23000): Duplicate entry '1'
for key 'PRIMARY'
mysql > INSERT IGNORE INTO books(id, title, author, year_published)
VALUES
   (1, 'Green Eggs and Ham', 'Dr. Seuss', 1960);
Query OK, 0 rows affected(0.00 sec)
mysql > REPLACE INTO books(id, title, author, year_published)
VALUES
   (1, 'Green Eggs and Ham', 'Dr. Seuss', 1960);
Query OK, 2 rows affected(0.00 sec)
mysql > SET @id = 1,
   @title = 'In Search of Lost Time',
   @author = 'Marcel Proust',
   @year_published = 1913;
INSERT INTO books
   (id, title, author, year_published)
VALUES
   (@id, @title, @author, @year_published)
ON DUPLICATE KEY UPDATE
title = @title,
   author = @author,
   year_published = @year_published;
mysql > SELECT * FROM books LIMIT 1; +
-- -- + -- -- -- -- -- -- -- -- -- -- -- -- + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
|
id | title | author | year_published |
   + -- -- + -- -- -- -- -- -- -- -- -- -- -- -- + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
   |
   1 | In Search of Lost Time | Marcel Proust | 1913 |
   + -- -- + -- -- -- -- -- -- -- -- -- -- -- -- + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- +
   1 row in set(0.00 sec)

Suggestion : 3

Suppose you have an existing SQL table called person_age, where id is the primary key:,and you also have new data in a DataFrame called extra_data,I personally don’t have anything against it so think a PR is welcome. One implementation across all DBMs using SQLAlchemy core is certainly how this should start if I am reading your points correctly, and same with just primary keys.,@cristianionescu92 An example would be this: I have a table called User with the following fields: id and name.

Suppose you have an existing SQL table called person_age, where id is the primary key:

age
id
1 18
2 42

and you also have new data in a DataFrame called extra_data

	age
	id
	2 44
	3 95

Expected Output

	age
	id
	1 18
	2 44
	3 95
  1. upsert_update - on row match, update row in database (for knowingly updating records - represents most use cases)
  2. upsert_ignore - on row match, do not update row in database (for cases where datasets have overlap, and you do not want to override data in tables)
import pandas as pd
from sqlalchemy
import create_engine

engine = create_engine("connection string")
df = pd.DataFrame(...)

df.to_sql(
   name = 'table_name',
   con = engine,
   if_exists = 'append',
   method = 'upsert_update'
   #(or upsert_ignore)
)

To implement this, SQLTable class would receive 2 new private methods containing the upsert logic, which would be called from the SQLTable.insert() method:

def insert(self, chunksize = None, method = None):

   #set insert method
if method is None:
   exec_insert = self._execute_insert
elif method == "multi":
   exec_insert = self.execute_insert_multi
#new upsert methods << <
   elif method == "upsert_update":
   exec_insert = self.execute_upsert_update
elif method == "upsert_ignore":
   exec_insert = self.execute_upsert_ignore
# >>>
   elif callable(method):
   exec_inset = partial(method, self)
else:
   raise ValueError("Invalid parameter 'method': {}".format(method))

   ...

There have been some good discussions around the API, and how an upsert should actually be called (i.e. via the if_exists argument, or via an explicit upsert argument). This will be clarified soon. For now, this is the pseudocode proposal for how the functionality would work using the SqlAlchemy upsert statement:

Identify primary key(s) and existing pkey values from DB table(
   if no primary key constraints identified, but upsert is called,
   return an error)

Make a temp copy of the incoming DataFrame

Identify records in incoming DataFrame with matching primary keys

Split temp DataFrame into records which have a primary key match, and records which don 't

if upsert:
   Update the DB table using `update`
for only the rows which match
else:
   Ignore rows from DataFrame with matching primary key values
finally:
Append remaining DataFrame rows with non - matching values in the primary key column to the DB table

With wildly varying syntax, I understand the temptation to use DELETE … INSERT to make the implementation dialect agnostic. But there’s another way: we can imitate the logic of the MERGE statement using a temp table and basic INSERT and UPDATE statements. The SQL:2016 MERGE syntax is as follows:

MERGE INTO target_table
USING source_table
ON search_condition
WHEN MATCHED THEN
UPDATE SET col1 = value1, col2 = value2, ...
   WHEN NOT MATCHED THEN
INSERT(col1, col2, ...)
VALUES(value1, value2, ...);

Below is a proof of concept I wrote for MySQL:

import uuid

import pandas as pd
from sqlalchemy
import create_engine

# This proof of concept uses this sample database
# https: //downloads.mysql.com/docs/world.sql.zip

   # Arbitrary, unique temp table name to avoid possible collision
source = str(uuid.uuid4()).split('-')[-1]

# Table we 're doing our upsert against
target = 'countrylanguage'

db_url = 'mysql://<{user: }>:<{passwd: }>.@<{host: }>/<{db: }>'

df = pd.read_sql(
   f 'SELECT * FROM `{target}`;',
   db_url
)

# Change
for UPDATE, 5.3 - > 5.4
df.at[0, 'Percentage'] = 5.4
# Change
for INSERT
df = df.append({
      'CountryCode': 'ABW',
      'Language': 'Arabic',
      'IsOfficial': 'F',
      'Percentage': 0.0
   },
   ignore_index = True
)

# List of PRIMARY or UNIQUE keys
key = ['CountryCode', 'Language']

# Do all of this in a single transaction
engine = create_engine(db_url)
with engine.begin() as con:
   # Create temp table like target table to stage data
for upsert
con.execute(f 'CREATE TEMPORARY TABLE `{source}` LIKE `{target}`;')
# Insert dataframe into temp table
df.to_sql(source, con, if_exists = 'append', index = False, method = 'multi')
# INSERT where the key doesn 't match (new rows)
con.execute(f ''
      '
      INSERT INTO `{target}`
      SELECT *
      FROM `{source}`
      WHERE(`{'`, `'.join(key)}`) NOT IN(SELECT `{'`, `'.join(key)}`
         FROM `{target}`);
      ''
      ')
      # Create a doubled list of tuples of non - key columns to template the update statement non_key_columns = [(i, i) for i in df.columns
         if i not in key
      ] # Whitespace
      for aesthetics whitespace = '\n\t\t\t'
      # Do an UPDATE...JOIN to set all non - key columns of target to equal source con.execute(f ''
         '
         UPDATE `{target}`
         `t`
         JOIN `{source}`
         `s`
         ON `t`.
         `{"`
         AND `t`.
         `".join(["` = `s`.
         `".join(i) for i in zip(key,key)])}`
         SET `t`.
         `{f"`, {
            whitespace
         }
         `t`.
         `".join(["` = `s`.
         `".join(i) for i in non_key_columns])}`;
         ''
         ')
         # Drop our temp table.con.execute(f 'DROP TABLE `{source}`;')

Let’s also assume that we have a session variable open to access the database. By calling this method:

session.bulk_update_mappings(
User,
<pandas dataframe above>.to_dict(orient='records')
   )

@cvonsteg originally proposed using method=, which would avoid the ambiguity of having two meanings for if_exists.

df.to_sql(
   name = 'table_name',
   con = engine,
   if_exists = 'append',
   method = 'upsert_update'
   #(or upsert_ignore)
)

Suggestion : 4

Now that we have a “base”, we can define any number of mapped classes in terms of it. We will start with just a single table called users, which will store records for the end-users using our application. A new class called User will be the class to which we map this table. Within the class, we define details about the table to which we’ll be mapping, primarily the table name, and names and datatypes of columns:,Outside of what the mapping process does to our class, the class remains otherwise mostly a normal Python class, to which we can define any number of ordinary attributes and methods needed by our application.,Let’s consider how a second table, related to User, can be mapped and queried. Users in our system can store any number of email addresses associated with their username. This implies a basic one to many association from the users to a new table which stores email addresses, which we will call addresses. Using declarative, we define this table along with its mapped class, Address:,We’ll need to create the addresses table in the database, so we will issue another CREATE from our metadata, which will skip over tables which have already been created:

>>>
import sqlalchemy
   >>>
   sqlalchemy.__version__
1.4 .0
>>> from sqlalchemy
import create_engine
   >>>
   engine = create_engine('sqlite:///:memory:', echo = True)
>>> from sqlalchemy.orm
import declarative_base

   >>>
   Base = declarative_base()
>>> from sqlalchemy import Column, Integer, String
>>> class User(Base):
... __tablename__ = 'users'
...
... id = Column(Integer, primary_key=True)
... name = Column(String)
... fullname = Column(String)
... nickname = Column(String)
...
... def __repr__(self):
... return "<User(name='%s', fullname='%s' , nickname='%s' )>" % (
   ... self.name, self.fullname, self.nickname)
>>> User.__table__
Table('users', MetaData(),
Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
   Column('name', String(), table=<users>),
      Column('fullname', String(), table=<users>),
         Column('nickname', String(), table=<users>), schema=None)
>>> Base.metadata.create_all(engine)
BEGIN...
   CREATE TABLE users(
      id INTEGER NOT NULL,
      name VARCHAR,
      fullname VARCHAR,
      nickname VARCHAR,
      PRIMARY KEY(id)
   )[...]()
COMMIT

Suggestion : 5

Save Pandas DataFrames into SQL database tables, or create DataFrames from SQL using Pandas' built-in SQLAlchemy integration.,Pandas and SQLAlchemy are a mach made in Python heaven. They're individually amongst Python's most frequently used libraries. Together they're greater than the sum of their parts, thanks to Pandas' built-in SQLAlchemy integration.,Loading data from a database into a Pandas DataFrame is surprisingly easy. To load an entire table, use the read_sql_table() method:,We now have a DataFrame ready to be saved as a SQL table! We can accomplish this with a single method built in to all DataFrames called to_sql(). As the name suggests, to_sql() allows us to upload our DataFrame to a SQL database as a SQL table. Let's see it in action:

postgres + psycopg2: //myuser:mypassword@hackersdb.example.com:5432/mydatabase
[DB_FLAVOR] + [DB_PYTHON_LIBRARY]: //[USERNAME]:[PASSWORD]@[DB_HOST]:[PORT]/[DB_NAME]
from os
import environ
from sqlalchemy
import create_engine

db_uri = environ.get('SQLALCHEMY_DATABASE_URI')
self.engine = create_engine(db_uri, echo = True)
import pandas as pd

jobs_df = pd.read_csv('data/nyc-jobs.csv')
from sqlalchemy.types
import Integer, Text, String, DateTime

   ...

   jobs_df.to_sql(
      'nyc_jobs',
      engine,
      if_exists = 'replace',
      index = False,
      chunksize = 500,
      dtype = {
         "job_id": Integer,
         "agency": Text,
         "business_title": Text,
         "job_category": Text,
         "salary_range_from": Integer,
         "salary_range_to": Integer,
         "salary_frequency": String(50),
         "work_location": Text,
         "division/work_unit": Text,
         "job_description": Text,
         "posting_date": DateTime,
         "posting_updated": DateTime
      }
   )
2020 - 06 - 11 23: 49: 21, 082 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'sql_mode'
2020 - 06 - 11 23: 49: 21, 082 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 396 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'lower_case_table_names'
2020 - 06 - 11 23: 49: 21, 396 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 432 INFO sqlalchemy.engine.base.Engine SELECT DATABASE()
2020 - 06 - 11 23: 49: 21, 432 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 470 INFO sqlalchemy.engine.base.Engine show collation where "Charset" = 'utf8mb4'
and "Collation" = 'utf8mb4_bin'
2020 - 06 - 11 23: 49: 21, 470 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 502 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns'
   AS CHAR(60)) AS anon_1
2020 - 06 - 11 23: 49: 21, 502 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 523 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns'
   AS CHAR(60)) AS anon_1
2020 - 06 - 11 23: 49: 21, 523 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 537 INFO sqlalchemy.engine.base.Engine SELECT CAST('test collated returns'
   AS CHAR CHARACTER SET utf8mb4) COLLATE utf8mb4_bin AS anon_1
2020 - 06 - 11 23: 49: 21, 537 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 587 INFO sqlalchemy.engine.base.Engine DESCRIBE "nyc_jobs"
2020 - 06 - 11 23: 49: 21, 588 INFO sqlalchemy.engine.base.Engine {}
2020 - 06 - 11 23: 49: 21, 654 INFO sqlalchemy.engine.base.Engine ROLLBACK
2020 - 06 - 11 23: 49: 21, 691 INFO sqlalchemy.engine.base.Engine
CREATE TABLE nyc_jobs(
   job_id INTEGER,
   agency TEXT,
   business_title TEXT,
   job_category TEXT,
   salary_range_from INTEGER,
   salary_range_to INTEGER,
   salary_frequency VARCHAR(50),
   work_location TEXT,
   division TEXT,
   job_description TEXT,
   created_at DATETIME,
   updated_at DATETIME
)