which sql database for tornado has an asynchronous driver?

  • Last Update :
  • Techknowledgy :

After coding this up, I found that this approach had roughly been done at the c level via this site: http://jan.kneschke.de/projects/mysql/async-mysql-queries-with-c-api/,So how do we do it? While looking into how the python driver actually connects to MySQL, I noticed that it uses the the mysql_real_query function call, as defined in sql-common/client.c of the mysql development api. This particular function looks like this:,Normally, this tornado application would only be able to deliver 1 request per second, even with concurrent connections. Hitting this with apache benchmark to make many simultaneous connections:,Our approach requires modifying the (excellent) python mysql drivers to expose some functions from the mysql layer. First, we need the file descriptor of the database connection to be exposed, so we can poll it using the Tornado ioloop:

    mysql_real_query(MYSQL * mysql,
       const char * query, ulong length) {
       DBUG_ENTER("mysql_real_query");
       DBUG_PRINT("enter", ("handle: 0x%lx", (long) mysql));
       DBUG_PRINT("query", ("Query = '%-.4096s'", query));

       if (mysql_send_query(mysql, query, length))
          DBUG_RETURN(1);
       DBUG_RETURN((int)( * mysql - > methods - > read_query_result)(mysql));
    }
    static MyMemberlist(_mysql_ConnectionObject_memberlist)[]
    MyMember(
       "fd",
       T_UINT,
       offsetof(_mysql_ConnectionObject, connection.net.fd),
       RO,
       "File descriptor of the server connection"
    ),
    static PyObject *
       _mysql_ConnectionObject_send_query(
          _mysql_ConnectionObject * self,
          PyObject * args) {
          char * query;
          int len, r;
          if (!PyArg_ParseTuple(args, "s#:query", & query, & len)) return NULL;
          check_connection(self);
          r = mysql_send_query( & (self - > connection), query, len);
          if (r) return _mysql_Exception(self);
          Py_INCREF(Py_None);
          return Py_None;
       }

    static PyObject *
       _mysql_ConnectionObject_read_query_result(
          _mysql_ConnectionObject * self,
          PyObject * args) {
          int r;
          r = self - > connection.methods - > read_query_result( & (self - > connection));
          if (r) return _mysql_Exception(self);
          Py_INCREF(Py_None);
          return Py_None;
       }
    {
       "send_query",
       (PyCFunction) _mysql_ConnectionObject_send_query,
       METH_VARARGS,
       _mysql_ConnectionObject_send_query__doc__
    }, {
       "read_query_result",
       (PyCFunction) _mysql_ConnectionObject_read_query_result,
       METH_VARARGS,
       _mysql_ConnectionObject_read_query_result__doc__
    },
    class NBConn(Connection):
       ""
    " Non-blocking basic select queries
    This is only non - blocking
    for the query part.It assumes the
    database is churning on the query itself, not the number of
       rows or establishing the connection.
    Retrieving rows could potentially be made non - blocking as
    well.
    ""
    "

    def nb_query(self, query, callback, on_error = None, args = None):
       ""
    " Non-blocking query. callback is function that takes list of tuple args ""
    "
    self.send_query(query)
    ioloop.IOLoop.instance().add_handler(self.fd,
       self.cb_factory(callback, on_error), ioloop.IOLoop.READ)

    def cb_factory(self, callback, on_error = None):
       ""
    " Returns a function that handles the ioloop call back "
    ""

    def cb(fd, ev):
       res = []
    try:
    self.read_query_result()
    # Collect results
    result = self.use_result()
    while True:
       row = result.fetch_row()
    if not row:
       break
    res.append(row[0])
    # Fire callback with results
    callback(res)
    except Exception, e:
       if on_error:
       return on_error(e)
    else:
       raise e
    finally:
    self.nb_cleanup()
    return cb

    def nb_cleanup(self):
       ioloop.IOLoop.instance().remove_handler(self.fd)
    self.close()

Suggestion : 2
  • Python 3.6 or above
  • Motor: Motor is MongoDB driver for Python Tornado. The motor can be installed using pip:
$ pip install motor
  • Tornado: Tornado is a Python web framework.  The Tornado can be installed using pip:
$ pip install tornado
  • Nest_asyncio: Nest_asyncio patch asyncio to allow nested event loops. Nest_asyncio can be installed using pip:
$ pip install nest_asyncio
  • Dnspython: DNS toolkit to connect to DB cluster.  Dnspython can be installed using pip:
$ pip install dnspython

Image Snapshot of the connection string for MongoDB database cluster

‘mongodb+srv://<UserName>:<Password>@<clustername>.gvtft.mongodb.net/test’

Suggestion : 3

Is there any async driver/module for MySQL which can be used on Tornado to support transactions ? I am writting Tornado app with MySQL as database. I have googled and found https://github.com/woshifyz/tornado-mysql,Is There Any Async Drivermodule For Mysql Which Can Be Used On Tornado To Support Transactions, 4 days ago Feb 23, 2022  · Integrating Tornado MongoDB can be tricky. But, Python async driver for MongoDB, Motor, comes with built-in support for Tornado. The Motor makes it simple to use MongoDB in Tornado regardless of the type of server you are building. MongoDB is a widely used Non-Relational database and Tornado is a popular asynchronous Python web server. , 1 week ago MySQL Python drivers + additions to work asynchronously in Tornado - GitHub - eliast/async-MySQL-python: MySQL Python drivers + additions to work asynchronously in Tornado ... Drizzle support probably won't happen in 1.2. There may be have to be an entirely different module, but still using DB-API. ... MySQL-shared to run if you compiled with ...

Fix transaction.execute args to kwargs

Suggestion : 4

Databases is suitable for integrating against any async Web framework, such as Starlette, Sanic, Responder, Quart, aiohttp, Tornado, or FastAPI.,Databases gives you simple asyncio support for a range of databases.,Check out the documentation on making database queries for examples of how to start using databases together with SQLAlchemy core expressions.,Note that if you are using any synchronous SQLAlchemy functions such as engine.create_all() or alembic migrations then you still have to install a synchronous DB driver: psycopg2 for PostgreSQL and pymysql for MySQL.

$ pip install databases
$ pip install databases[asyncpg]
$ pip install databases[aiopg]
$ pip install databases[aiomysql]
$ pip install databases[asyncmy]
$ pip install databases[aiosqlite]
$ pip install databases[aiosqlite]
$ pip install ipython
# Create a database instance, and connect to it.
from databases
import Database
database = Database('sqlite+aiosqlite:///example.db')
await database.connect()

# Create a table.
query = ""
"CREATE TABLE HighScores (id INTEGER PRIMARY KEY, name VARCHAR(100), score INTEGER)"
""
await database.execute(query = query)

# Insert some data.
query = "INSERT INTO HighScores(name, score) VALUES (:name, :score)"
values = [{
      "name": "Daisy",
      "score": 92
   },
   {
      "name": "Neil",
      "score": 87
   },
   {
      "name": "Carol",
      "score": 43
   },
]
await database.execute_many(query = query, values = values)

# Run a database query.
query = "SELECT * FROM HighScores"
rows = await database.fetch_all(query = query)
print('High Scores:', rows)

Suggestion : 5

The RethinkDB Python driver integrates with both the Tornado web framework and the Twisted networking engine. By using the set_loop_type command, you can select either the 'tornado' or 'twisted' event loop model, returning Tornado Future objects or Twisted Deferred objects respectively.,Before connect, use the set_loop_type("twisted") command to set RethinkDB to use asynchronous event loops compatible with the Twisted reactor.,Before connect, use the set_loop_type("tornado") command to set RethinkDB to use asynchronous event loops compatible with Tornado.,Due to its event-driven nature, JavaScript can easily execute RethinkDB queries in an asynchronous fashion. The official RethinkDB drivers currently support integration with EventMachine for Ruby, and Tornado and Twisted for Python.

require 'eventmachine'
require 'rethinkdb'
include RethinkDB::Shortcuts

conn = r.connect(host: 'localhost', port: 28015)

EventMachine.run {
   r.table('test').order_by(: index => 'id').em_run(conn) {
      | row |
         # do something with returned row data
         p row
   }
}
EventMachine.run {
   printed = 0
   handle = r.table('test').order_by(: index => 'id').em_run(conn) {
      | row |
         printed += 1
      if printed > 3
      handle.close
      else
         p row
      end
   }
}
RethinkDB::ReqlRunTimeError: Table `test.test`
does not exist.
Backtrace:
   r.table('test') ^
   ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
EventMachine.run {
   r.table('test').order_by(: index => 'id').em_run(conn) {
      | err, row |
         if err
      p[: err, err.to_s]
      else
         p[: row, row]
      end
   }
}
require 'eventmachine'
require 'rethinkdb'
include RethinkDB::Shortcuts

conn = r.connect(host: 'localhost', port: 28015)

class Printer < RethinkDB::Handler

def on_open
p: open
end

def on_close
p: closed
end

def on_error(err)
p[: err, err.to_s]
end

def on_val(val)
p[: val, val]
end

end

EventMachine.run {
   r.table('test').order_by(: index => 'id').em_run(conn, Printer)
}

# Sample output
   : open[: val, {
      "id" => 1
   }]
   [: val, {
      "id" => 2
   }]
   [: val, {
      "id" => 3
   }]: closed
class Printer < RethinkDB::Handler

def on_open
p: open
end

def on_close
p: closed
end

def on_error(err)
p[: err, err.to_s]
end

# Handle arrays
def on_array(array)
p[: array, array]
end

# Handle atoms
def on_atom(atom)
p[: atom, atom]
end

# Handle individual values received from streams
def on_stream_val(val)
p[: stream_val, val]
end

def on_val(val)
p[: val, val]
end

end

EventMachine.run {
   r.table('test').order_by(: index => 'id').em_run(conn, Printer)
   # print an array
   r.expr([1, 2, 3]).em_run(conn, Printer)
   # print a single row
   r.table('test').get(1).em_run(conn, Printer)
}

# Sample output
   : open[: stream_val, {
      "id" => 0
   }]
   [: stream_val, {
      "id" => 1
   }]
   [: stream_val, {
      "id" => 2
   }]: closed: open[: array, [1, 2, 3]]: closed: open[: atom, {
      "id" => 0
   }]: closed