how to create bi-directional messaging using amp in twisted/python

  • Last Update :
  • Techknowledgy :

File: count_server.tac

from twisted.protocols.amp
import AMP
from twisted.internet
import reactor
from twisted.internet.protocol
import Factory
from twisted.internet.endpoints
import TCP4ServerEndpoint
from twisted.application.service
import Application
from twisted.application.internet
import StreamServerEndpointService

from count_client
import Counter

application = Application('test AMP server')

endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = Counter
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

File: count_client.py

if __name__ == '__main__':
   import count_client
raise SystemExit(count_client.main())

from sys
import stdout

from twisted.python.log
import startLogging, err
from twisted.protocols
import amp
from twisted.internet
import reactor
from twisted.internet.protocol
import Factory
from twisted.internet.endpoints
import TCP4ClientEndpoint

class Count(amp.Command):
   arguments = [('n', amp.Integer())]
response = [('ok', amp.Boolean())]

class Counter(amp.AMP):
   @Count.responder
def count(self, n):
   print 'received:', n
n += 1

if n < 10:
   print 'sending:', n
self.callRemote(Count, n = n)

return {
   'ok': True
}

def connect():
   endpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', 8750)
factory = Factory()
factory.protocol = Counter
return endpoint.connect(factory)

def main():
   startLogging(stdout)

d = connect()
d.addErrback(err, 'connection failed')
d.addCallback(lambda p: p.callRemote(Count, n = 1))
d.addErrback(err, 'call failed')

reactor.run()

Server output:

$ twistd -n -y count_server.tac
2013-03-27 11:05:18-0500 [-] Log opened.
2013-03-27 11:05:18-0500 [-] twistd 12.2.0 (/usr/bin/python 2.7.3) starting up.
2013-03-27 11:05:18-0500 [-] reactor class: twisted.internet.epollreactor.EPollReactor.
2013-03-27 11:05:18-0500 [-] Factory starting on 8750
2013-03-27 11:05:18-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x2adc368>
   2013-03-27 11:05:22-0500 [twisted.internet.protocol.Factory] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 1
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 2
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 3
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 4
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 5
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 6
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 7
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 8
   2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 9
   2013-03-27 11:05:26-0500 [Counter,0,127.0.0.1] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
   ^C2013-03-27 11:05:31-0500 [-] Received SIGINT, shutting down.
   2013-03-27 11:05:31-0500 [-] (TCP Port 8750 Closed)
   2013-03-27 11:05:31-0500 [-] Stopping factory <twisted.internet.protocol.Factory instance at 0x2adc368>
      2013-03-27 11:05:31-0500 [-] Main loop terminated.
      2013-03-27 11:05:31-0500 [-] Server Shut Down.

File: ampserver.py

from twisted.protocols
import amp

class Sum(amp.Command):
   arguments = [('a', amp.Integer()),
      ('b', amp.Integer())
   ]
response = [('total', amp.Integer())]

class Divide(amp.Command):
   arguments = [('numerator', amp.Integer()),
      ('denominator', amp.Integer())
   ]
response = [('result', amp.Float())]
errors = {
   ZeroDivisionError: 'ZERO_DIVISION'
}

class Math(amp.AMP):
   def sum(self, a, b):
   total = a + b
print 'Did a sum: %d + %d = %d' % (a, b, total)
return {
   'total': total
}
Sum.responder(sum)

def divide(self, numerator, denominator):
   result = float(numerator) / denominator
print 'Divided: %d / %d = %f' % (numerator, denominator, result)
return {
   'result': result
}
Divide.responder(divide)

def main():
   from twisted.internet
import reactor
from twisted.internet.protocol
import Factory
pf = Factory()
pf.protocol = Math
reactor.listenTCP(1234, pf)
print 'started'
reactor.run()

if __name__ == '__main__':
   main()

File: ampclient.py

from twisted.internet
import reactor, protocol
from twisted.internet.task
import deferLater
from twisted.protocols
import amp
from ampserver
import Sum, Divide

connection = None

class MathClient(amp.AMP):
   def connectionMade(self):
   global connection
connection = self

class MathFactory(protocol.ReconnectingClientFactory):
   protocol = MathClient

if __name__ == '__main__':
   reactor.connectTCP('127.0.0.1', 1234, MathFactory())
def simpleSum():
   global connection
d = connection.callRemote(Sum, a = 1, b = 5)
def prin(result):
   print(result)
d.addCallback(prin)
return d
deferLater(reactor, 1, simpleSum)
deferLater(reactor, 3, simpleSum)
deferLater(reactor, 6, simpleSum)
deferLater(reactor, 9, simpleSum)
deferLater(reactor, 12, simpleSum)
deferLater(reactor, 15, simpleSum)
deferLater(reactor, 18, simpleSum).addCallback(lambda _: reactor.stop())
reactor.run()

Suggestion : 2

AMP is a bidirectional command/response-oriented protocol intended to be extended with application-specific request types and handlers. Various simple data types are supported and support for new data types can be added by applications.,Either side of an AMP connection can issue a command to the other side. Each kind of command is represented as a subclass of Command. A Command defines arguments, response values, and error conditions.,If no box receiver is passed in, AMP acts as its own box receiver. It handles boxes by treating them as command requests or responses and delivering them to the appropriate responder or as a result to a callRemote Deferred.,The logic for handling a command can be specified as an object separate from the AMP instance which interprets and formats bytes over the network.

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 from twisted.protocols.amp
 import AMP
 from twisted.internet
 import reactor
 from twisted.internet.protocol
 import Factory
 from twisted.internet.endpoints
 import TCP4ServerEndpoint
 from twisted.application.service
 import Application
 from twisted.application.internet
 import StreamServerEndpointService

 application = Application("basic AMP server")

 endpoint = TCP4ServerEndpoint(reactor, 8750)
 factory = Factory()
 factory.protocol = AMP
 service = StreamServerEndpointService(endpoint, factory)
 service.setServiceParent(application)
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 if __name__ == '__main__':
    import basic_client
 raise SystemExit(basic_client.main())

 from sys
 import stdout

 from twisted.python.log
 import startLogging, err
 from twisted.protocols.amp
 import AMP
 from twisted.internet
 import reactor
 from twisted.internet.protocol
 import Factory
 from twisted.internet.endpoints
 import TCP4ClientEndpoint

 def connect():
    endpoint = TCP4ClientEndpoint(reactor, "127.0.0.1", 8750)
 factory = Factory()
 factory.protocol = AMP
 return endpoint.connect(factory)

 def main():
    startLogging(stdout)

 d = connect()
 d.addErrback(err, "Connection failed")
 def done(ignored):
    reactor.stop()
 d.addCallback(done)

 reactor.run()
1._
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 from twisted.protocols.amp
 import AMP
 from twisted.internet
 import reactor
 from twisted.internet.protocol
 import Factory
 from twisted.internet.endpoints
 import TCP4ServerEndpoint
 from twisted.application.service
 import Application
 from twisted.application.internet
 import StreamServerEndpointService

 application = Application("basic AMP server")

 endpoint = TCP4ServerEndpoint(reactor, 8750)
 factory = Factory()
 factory.protocol = AMP
 service = StreamServerEndpointService(endpoint, factory)
 service.setServiceParent(application)
2._
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 if __name__ == '__main__':
    import basic_client
 raise SystemExit(basic_client.main())

 from sys
 import stdout

 from twisted.python.log
 import startLogging, err
 from twisted.protocols.amp
 import AMP
 from twisted.internet
 import reactor
 from twisted.internet.protocol
 import Factory
 from twisted.internet.endpoints
 import TCP4ClientEndpoint

 def connect():
    endpoint = TCP4ClientEndpoint(reactor, "127.0.0.1", 8750)
 factory = Factory()
 factory.protocol = AMP
 return endpoint.connect(factory)

 def main():
    startLogging(stdout)

 d = connect()
 d.addErrback(err, "Connection failed")
 def done(ignored):
    reactor.stop()
 d.addCallback(done)

 reactor.run()
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 from twisted.protocols.amp
 import Integer, String, Unicode, Command

 class UsernameUnavailable(Exception):
    pass

 class RegisterUser(Command):
    arguments = [('username', Unicode()),
       ('publickey', String())
    ]

 response = [('uid', Integer())]

 errors = {
    UsernameUnavailable: 'username-unavailable'
 }

The logic for handling a command can be specified as an object separate from the AMP instance which interprets and formats bytes over the network.

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 from twisted.protocols.amp
 import CommandLocator
 from twisted.python.filepath
 import FilePath

 class UsernameUnavailable(Exception):
    pass

 class UserRegistration(CommandLocator):
    uidCounter = 0

 @RegisterUser.responder
 def register(self, username, publickey):
    path = FilePath(username)
 if path.exists():
    raise UsernameUnavailable()
 self.uidCounter += 1
 path.setContent('%d %s\n' % (self.uidCounter, publickey))
 return self.uidCounter

When you define a separate CommandLocator subclass, use it by passing an instance of it to the AMP initializer.

1
2
factory = Factory()
factory.protocol = lambda: AMP(locator = UserRegistration())
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 if __name__ == '__main__':
    import command_client
 raise SystemExit(command_client.main())

 from sys
 import stdout

 from twisted.python.log
 import startLogging, err
 from twisted.protocols.amp
 import Integer, String, Unicode, Command
 from twisted.internet
 import reactor

 from basic_client
 import connect

 class UsernameUnavailable(Exception):
    pass

 class RegisterUser(Command):
    arguments = [('username', Unicode()),
       ('publickey', String())
    ]

 response = [('uid', Integer())]

 errors = {
    UsernameUnavailable: 'username-unavailable'
 }

 def main():
    startLogging(stdout)

 d = connect()
 def connected(protocol):
    return protocol.callRemote(
       RegisterUser,
       username = u 'alice',
       publickey = 'ssh-rsa AAAAB3NzaC1yc2 alice@actinium')
 d.addCallback(connected)

 def registered(result):
    print 'Registration result:', result
 d.addCallback(registered)

 d.addErrback(err, "Failed to register")

 def finished(ignored):
    reactor.stop()
 d.addCallback(finished)

 reactor.run()

The logic for handling a command can be specified as an object separate from the AMP instance which interprets and formats bytes over the network.

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 from twisted.protocols.amp
 import CommandLocator
 from twisted.python.filepath
 import FilePath

 class UsernameUnavailable(Exception):
    pass

 class UserRegistration(CommandLocator):
    uidCounter = 0

 @RegisterUser.responder
 def register(self, username, publickey):
    path = FilePath(username)
 if path.exists():
    raise UsernameUnavailable()
 self.uidCounter += 1
 path.setContent('%d %s\n' % (self.uidCounter, publickey))
 return self.uidCounter

When you define a separate CommandLocator subclass, use it by passing an instance of it to the AMP initializer.

1
2
factory = Factory()
factory.protocol = lambda: AMP(locator = UserRegistration())

AMP conversations consist of an exchange of messages called boxes. A box consists of a sequence of pairs of key and value (for example, the pair username and alice). Boxes are generally represented as dict instances. Normally boxes are passed back and forth to implement the command request/response features described above. The logic for handling each box can be specified as an object separate from the AMP instance.

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 from zope.interface
 import implements

 from twisted.protocols.amp
 import IBoxReceiver

 class BoxReflector(object):
    implements(IBoxReceiver)

 def startReceivingBoxes(self, boxSender):
    self.boxSender = boxSender

 def ampBoxReceived(self, box):
    self.boxSender.sendBox(box)

 def stopReceivingBoxes(self, reason):
    self.boxSender = None

To use a custom IBoxReceiver, pass it to the AMP initializer.

1
2
factory = Factory()
factory.protocol = lambda: AMP(boxReceiver = BoxReflector())

Suggestion : 3

I will assume you have installed relatively recent versions of Python and Twisted. The examples were developed with Python 2.5 and Twisted 8.2.0.,This is the end of Part 1. In Part 2, we will write some network programs, both blocking and non-blocking, as simply as possible (without using Twisted), to get a feel for how an asynchronous Python program actually works.,And moving outside of Python, there are plenty of other languages and systems that are either based around, or make use of, the asynchronous programming model. Your knowledge of Twisted will continue to serve you as you explore the wider areas of this subject.,Twisted is not the only reactor framework that uses callbacks. The older asynchronous Python frameworks Medusa and asyncore also use them. As do the GUI toolkits GTK and QT, both based, like many GUI frameworks, on a reactor loop.

The example code is available as a zip or tar file or as a clone of my public git repository. If you can use git or another version control system that can read git repositories, then I recommend using that method as I will update the examples over time and it will be easier for you to stay current. As a bonus, it includes the SVG source files used to generate the figures. Here is the git command to clone the repository:

git clone git: //github.com/jdavisp3/twisted-intro.git

The basic slow poetry server is implemented in blocking-server/slowpoetry.py. You can run one instance of the server like this:

python blocking - server / slowpoetry.py poetry / ecstasy.txt

That command will start up the blocking server with John Donne�s poem �Ecstasy� as the poem to serve. Go ahead and look at the source code to the blocking server now. As you can see, it does not use Twisted, only basic Python socket operations. It also sends a limited number of bytes at a time, with a fixed time delay between them. By default, it sends 10 bytes every 0.1 seconds, but you can change these parameters with the –num-bytes and –delay command line options. For example, to send 50 bytes every 5 seconds:

python blocking - server / slowpoetry.py--num - bytes 50--delay 5 poetry / ecstasy.txt

If you have the netcat program available, you could test the above command like this:

netcat localhost 10000

Also in the example code is a blocking client which can download poems from multiple servers, one after another. Let�s give our client three tasks to perform, as in Figure 1 from Part 1. First we�ll start three servers, serving three different poems. Run these commands in three different terminal windows:

python blocking - server / slowpoetry.py--port 10000 poetry / ecstasy.txt--num - bytes 30
python blocking - server / slowpoetry.py--port 10001 poetry / fascination.txt
python blocking - server / slowpoetry.py--port 10002 poetry / science.txt

This program is in basic-twisted/hello.py. If you run it, you will see this output:

Starting the reactor.
Hello from the reactor loop!
   Lately I feel like I 'm stuck in a rut.

You can find it in basic-twisted/stack.py and it prints out something like this:

The python stack:
   ...
   reactor.run() < --This is where we called the reactor
   ...
   ... < --A bunch of Twisted
function calls
...
traceback.print_stack() < --The second line in the stack
function

Here is the output of our countdown program:

Start!
   5...
   4...
   3...
   2...
   1...
   Stop!

Although Twisted is probably more often used to write servers, clients are simpler than servers and we�re starting out as simply as possible. Let�s try out our first poetry client written with Twisted. The source code is in twisted-client-1/get-poetry.py. Start up some poetry servers as before:

python blocking - server / slowpoetry.py--port 10000 poetry / ecstasy.txt--num - bytes 30
python blocking - server / slowpoetry.py--port 10001 poetry / fascination.txt
python blocking - server / slowpoetry.py--port 10002 poetry / science.txt

And then run the client like this:

python twisted - client - 1 / get - poetry.py 10000 10001 10002

And you should get some output like this:

Task 1: got 60 bytes of poetry from 127.0 .0 .1: 10000
Task 2: got 10 bytes of poetry from 127.0 .0 .1: 10001
Task 3: got 10 bytes of poetry from 127.0 .0 .1: 10002
Task 1: got 30 bytes of poetry from 127.0 .0 .1: 10000
Task 3: got 10 bytes of poetry from 127.0 .0 .1: 10002
Task 2: got 10 bytes of poetry from 127.0 .0 .1: 10001
   ...
   Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0: 00: 10.134220

You�ll get some output that looks something like this:

Task 1: got 3003 bytes of poetry from 127.0 .0 .1: 10000
Task 3: got 653 bytes of poetry from 127.0 .0 .1: 10002
Task 2: got 623 bytes of poetry from 127.0 .0 .1: 10001
Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0: 00: 10.132753

With this change the program will print a stack trace and then quit the first time it receives some data. You could run this version like so:

python twisted - client - 2 / get - poetry - stack.py 10000

And you will get a stack trace like this:

File "twisted-client-2/get-poetry-stack.py", line 125, in
poetry_main()

   ...# I removed a bunch of lines here

File ".../twisted/internet/tcp.py", line 463, in doRead # Note the doRead callback
return self.protocol.dataReceived(data)
File "twisted-client-2/get-poetry-stack.py", line 58, in dataReceived
traceback.print_stack()

The other changes are all of a piece so I won�t bother posting them here. You can test client 3.1 by using a port with no server like this:

python twisted - client - 3 / get - poetry - 1. py 10004

And you�ll get some output like this:

Poem failed: [Failure instance: Traceback(failure with no frames):: Connection was refused by other side: 111: Connection refused.]

This code makes a new deferred, adds a callback/errback pair with the addCallbacks method, and then fires the �normal result� chain with the callback method. Of course, it�s not much of a chain since it only has a single callback, but no matter. Run the code and it produces this output:

Your poem is served:
   This poem is short.
Finished

And after running that script we get this output:

No poetry
for you.
Finished

Here we are passing a regular Exception to the errback method. In the errback, we are printing out the class and the error result itself. We get this output:

twisted.python.failure.Failure[Failure instance: Traceback(failure with no frames):: I have failed.]
No poetry
for you.