writing an "interactive" client with twisted/autobahn websockets

  • Last Update :
  • Techknowledgy :

What do you think?

#-- -- - twisted-- -- -- -- --
class _WebSocketClientProtocol(WebSocketClientProtocol):
   def __init__(self, factory):
   self.factory = factory

def onOpen(self):
   log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
   def __init__(self, * args, ** kwargs):
   WebSocketClientFactory.__init__(self, * args, ** kwargs)
self.protocol_instance = None
self.base_client = None

def buildProtocol(self, addr):
   return _WebSocketClientProtocol(self)
#-- -- --end twisted-- -- -- -

class BaseWBClient(object):

   def __init__(self, websocket_settings):
   self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None

def connect(self):
   log.debug("Connecting to %(host)s:%(port)d" % self.settings)
self.factory = _WebSocketClientFactory(
   "ws://%(host)s:%(port)d" % self.settings,
   debug = True)
self.factory.base_client = self
c = connectWS(self.factory)
self._reactor_thread = threading.Thread(target = reactor.run,
   args = (False, ))
self._reactor_thread.daemon = True
self._reactor_thread.start()

def send_message(self, body):
   if not self._check_connection():
   return
log.debug("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)

def _check_connection(self):
   if not self._connected_event.wait(timeout = 10):
   log.error("Unable to connect to server")
self.close()
return False
return True

def _dispatch(self):
   log.debug("Dispatching")
while True:
   try:
   body = self._send_queue.get(block = False)
except Queue.Empty:
   break
self.factory.protocol_instance.sendMessage(body)

def close(self):
   reactor.callFromThread(reactor.stop)

Suggestion : 2

Python has an improved sentence structure - one that is like the English language. New engineers for Python can undoubtedly understand where they stand in the development process. The simplicity of composing allows quick application building. ,Instead of tracking throughout your disk to see if there is a duplicate, you can automate the process using coding, by writing a program to recursively track through the disk and remove all the found duplicates and that’s what this article is about.,The answer is hashing, with hashing can generate a given string of letters and numbers which act as the identity of a given file and if we find any other file with the same identity we gonna delete it.,Python can do such numerous things that developers can't discover enough reasons to admire it. Python application development isn't restricted to web and enterprise applications. It is exceptionally adaptable and superb for a wide range of uses.

Here is a simple WebSocket Echo Server that will echo back any WebSocket message received:

from autobahn.twisted.websocket
import WebSocketServerProtocol
# or: from autobahn.asyncio.websocket
import WebSocketServerProtocol

class MyServerProtocol(WebSocketServerProtocol):

   def onConnect(self, request):
   print("Client connecting: {}".format(request.peer))

def onOpen(self):
   print("WebSocket connection open.")

def onMessage(self, payload, isBinary):
   if isBinary:
   print("Binary message received: {} bytes".format(len(payload)))
else:
   print("Text message received: {}".format(payload.decode('utf8')))

# echo back message verbatim
self.sendMessage(payload, isBinary)

def onClose(self, wasClean, code, reason):
   print("WebSocket connection closed: {}".format(reason))
  1. subscribe to a topic
  2. publish an event
  3. register a procedure
  4. call a procedure
from autobahn.twisted.wamp
import ApplicationSession
# or: from autobahn.asyncio.wamp
import ApplicationSession

class MyComponent(ApplicationSession):

   @inlineCallbacks
def onJoin(self, details):

   # 1. subscribe to a topic so we receive events
def onevent(msg):
   print("Got event: {}".format(msg))

yield self.subscribe(onevent, 'com.myapp.hello')

# 2. publish an event to a topic
self.publish('com.myapp.hello', 'Hello, world!')

# 3. register a procedure
for remote calling
def add2(x, y):
   return x + y

self.register(add2, 'com.myapp.add2')

# 4. call a remote procedure
res = yield self.call('com.myapp.add2', 2, 3)
print("Got result: {}".format(res))

To install:

pip install autobahn[xbr]

or (asyncio, with more bells an whistles)

pip install autobahn[asyncio, encryption, serialization, xbr]

To install Autobahn with all available serializers:

pip install autobahn[serializers]

Here is a simple WebSocket Echo Server that will echo back any WebSocket message received:

from autobahn.twisted.websocket
import WebSocketServerProtocol
# or: from autobahn.asyncio.websocket
import WebSocketServerProtocol

class MyServerProtocol(WebSocketServerProtocol):

   def onConnect(self, request):
   print("Client connecting: {}".format(request.peer))

def onOpen(self):
   print("WebSocket connection open.")

def onMessage(self, payload, isBinary):
   if isBinary:
   print("Binary message received: {} bytes".format(len(payload)))
else:
   print("Text message received: {}".format(payload.decode('utf8')))

# echo back message verbatim
self.sendMessage(payload, isBinary)

def onClose(self, wasClean, code, reason):
   print("WebSocket connection closed: {}".format(reason))
  1. subscribe to a topic
  2. publish an event
  3. register a procedure
  4. call a procedure
from autobahn.twisted.wamp
import ApplicationSession
# or: from autobahn.asyncio.wamp
import ApplicationSession

class MyComponent(ApplicationSession):

   @inlineCallbacks
def onJoin(self, details):

   # 1. subscribe to a topic so we receive events
def onevent(msg):
   print("Got event: {}".format(msg))

yield self.subscribe(onevent, 'com.myapp.hello')

# 2. publish an event to a topic
self.publish('com.myapp.hello', 'Hello, world!')

# 3. register a procedure
for remote calling
def add2(x, y):
   return x + y

self.register(add2, 'com.myapp.add2')

# 4. call a remote procedure
res = yield self.call('com.myapp.add2', 2, 3)
print("Got result: {}".format(res))

To install:

pip install autobahn[xbr]

or (asyncio, with more bells an whistles)

pip install autobahn[asyncio, encryption, serialization, xbr]

To install Autobahn with all available serializers:

pip install autobahn[serializers]

Instead of creating a temporary variable to hold the value of the one while swapping, you can do this instead

>>> FirstName = "kalebu" >>>
   LastName = "Jordan" >>>
   FirstName, LastName = LastName, FirstName >>>
   print(FirstName, LastName)
   ('Jordan', 'kalebu')

Suggestion : 3

dep: python interactive high-level object-oriented language (default version) , dep: python-twisted (>= 11.1) Event-based framework for internet applications (dependency package) , dep: python-lz4 (>= 0.2.1) Python interface to the lz4 compression library , dep: python-qrcode QR Code image generator library - Python 2.7

Autobahn|Python is a networking library that is part of the Autobahn project and provides implementations of:

 * The WebSocket Protocol
    *
    The Web Application Messaging Protocol(WAMP)

Suggestion : 4

This is the cmd class that sends input to anycodings_twisted the websocket client:,Your copmmand class creates a new, anycodings_python-3.x unconnected protocol instance and then anycodings_python-3.x tries to use it as if it were connected:,This is how I'm calling the websocket client anycodings_twisted and cmd loop:,Later on you have code that actually anycodings_python-3.x connects a protocol to something:

These are the relevant imports and setup:

from cmd
import Cmd
from crochet
import setup, run_in_reactor, wait_for, retrieve_result, TimeoutError

# Setup crochet before importing twisted
setup()

from twisted.internet
import reactor, ssl
from twisted.python
import log
from autobahn.twisted.websocket
import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS

This is the websocket client protocol class:

class MyClientProtocol(WebSocketClientProtocol):

   def __init__(self, * args, ** kwargs):
   super(MyClientProtocol, self).__init__( * args, ** kwargs)

def onConnect(self, response):
   print("Connected")

def onMessage(self, payload, isBinary):
   if not isBinary:
   print('Message received: {}'.format(payload.decode('utf8')))

def sendTask(self, payload):
   payload = json.dumps(payload, ensure_ascii = False).encode('utf8')
self.sendMessage(payload)

This is the websocket client factory class:

class MyClientFactory(WebSocketClientFactory):

   def __init__(self, * args, ** kwargs):
   super(MyClientFactory, self).__init__( * args, ** kwargs)

def buildFactory(self, uri, headers):
   factory = WebSocketClientFactory(uri, headers = headers)
factory.protocol = MyClientProtocol
return factory

This is how I'm calling the websocket client anycodings_twisted and cmd loop:

if __name__ == '__main__':

   @run_in_reactor
def start_connectWS():
   headers = {
      'header1': 'value1'
   }
f = MyClientFactory()
connectStatement = f.buildFactory(uri, headers)
if connectStatement.isSecure:
   contextFactory = ssl.ClientContextFactory()
else:
   contextFactory = None
connectWS(connectStatement, contextFactory)

start_connectWS()
mycmd().cmdloop()

This is the exception:

Unhandled Error
Traceback (most recent call last):
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "/Users/tomd/project/lib/python3.7/site-packages/crochet/_eventloop.py", line 412, in <lambda>
   target=lambda: self._reactor.run(installSignalHandlers=False),
   File "/Users/tomd/project/lib/python3.7/site-packages/twisted/internet/base.py", line 1283, in run
   self.mainLoop()
   File "/Users/tomd/project/lib/python3.7/site-packages/twisted/internet/base.py", line 1292, in mainLoop
   self.runUntilCurrent()
   --- <exception caught here> ---
      File "/Users/tomd/project/lib/python3.7/site-packages/twisted/internet/base.py", line 886, in runUntilCurrent
      f(*a, **kw)
      File "./client.py", line 62, in sendTask
      self.sendMessage(payload)
      File "/Users/tomd/project/lib/python3.7/site-packages/autobahn/websocket/protocol.py", line 2215, in sendMessage
      if self.state != WebSocketProtocol.STATE_OPEN:
      builtins.AttributeError: 'MyClientProtocol' object has no attribute 'state'

Your copmmand class creates a new, anycodings_python-3.x unconnected protocol instance and then anycodings_python-3.x tries to use it as if it were connected:

class mycmd(Cmd):
   def do_send(self, inp):
   payload = {
      'task': inp
   }
m = MyClientProtocol()
reactor.callFromThread(m.sendTask, payload)

Specifically, this creates a new anycodings_python-3.x instance of your protocol class:

        m = MyClientProtocol()

And this tries to use it as if it were anycodings_python-3.x connected:

        reactor.callFromThread(m.sendTask, payload)

For example, MyClientProtocol.onConnect anycodings_python-3.x can set itself as an attribute on the anycodings_python-3.x factory instance and your command line anycodings_python-3.x code could accept the factory instance anycodings_python-3.x as an argument, then read the connected anycodings_python-3.x protocol instance from the attribute.

class MyClientProtocol(...):
   def onConnect(self, response):
   self.factory.connectedProtocol = self
   ...

   class mycmd(Cmd):
   #...__init__ that accepts factory and sets it on self

def do_send(self, inp):
   payload = {
      'task': inp
   }
m = self.factory.connectedProtocol
if m is None:
   print("No connection")
else:
   reactor.callFromThread(m.sendTask, payload)