Module cluster.websocket_thread

Expand source code
import asyncio
import threading
import websockets
import collections
import queue
import json
import logging


class WebsocketThread(threading.Thread):
    """Initiates a thread to run a websocket connection, send messages from a `reply_queue` and receive tasks, saving
        them in a `tasks_queue`.

    .. versionadded::0.2.0
    .. versionchanged::1.1.0

    Attributes:
        stop: A boolean controlling the running state of this thread. When stop is set to True, running tasks are
            interrupted, the websocket is closed if it was open and the `run()` method returns.
            When stop is set to True by a method of this thread, an exception is added to the `exception_queue` provided
            at initialisation.
    """

    __version__ = '1.1.0'

    def __init__(self, websocket_uri: str, exception_queue: queue.Queue, add_tasks,
                 reply_queue: collections.deque, loop, authorization, connection_timeout: float):
        """
        Args:
            websocket_uri: A string containing the uri of the websocket host with which a connection should be made.
            exception_queue: A reference to a queue in which raised exceptions should be saved to be passed on to the
                caller of this thread.
            add_tasks: An instance method of the calling class that handles received messages.
            reply_queue: A queue in which the messages to be sent can be found.
            loop: The loop to be set as the asynchronous event loop of this thread.
            authorization: The value to be set as the `Authorization` header value to be sent on websocket connection
                set up.
            connection_timeout: The timeout to be set when connecting to the websocket host.
        """
        threading.Thread.__init__(self)
        self._exception_queue = exception_queue  # reference to queue to store raised exceptions to sync with caller
        self._websocket_uri = websocket_uri  # the uri of the websocket with which a connection should be made
        self._add_tasks = add_tasks  # an instance method of the calling class that adds new tasks to its collection.
        self._reply_queue = reply_queue  # reference to the waiting replies queue
        self._connection_timeout = connection_timeout   # the timeout set for a websocket connection to be established
        self._websocket = None  # variable to reference the websocket connection
        self.stop = False  # variable controlling whether this thread should keep running or should return
        self._loop = loop  # reference to the current loop in which async methods should be called
        asyncio.set_event_loop(loop)

        self._authorization = authorization  # authorization value to authorize to the server
        self._headers = {"Authorization": self._authorization}

    def run(self):
        """Starts communication with the websocket host."""
        self._loop.run_until_complete(self._communicate_with_server())
        logging.debug("Thread " + self.getName() + " has stopped.")

    async def _replies_to_send(self):
        """Generates next reply to be sent and removes it from the reply queue."""
        try:
            yield self._reply_queue.popleft()
        except IndexError:
            raise StopAsyncIteration()

    async def _receive_handler(self):
        """Checks for new messages from server and processes them.

        Sets self.stop to True when the websocket raises a `ConnectionClosedError`.

        Returns: None if the websocket raises a `ConnectionClosedError`.

        Post: `self.stop` equals True.
        """
        while not self.stop:
            try:
                async for message in self._websocket:
                    await self._process_received_message(message)
            except websockets.exceptions.ConnectionClosedError as ex:
                logging.debug(ex)
                self._exception_queue.put(ex)  # pass exception to caller of this thread
                self.stop = True  # return method and stop this thread
            except Exception as e:
                logging.debug(e)

    async def _process_received_message(self, message):
        """Adds valid received tasks to waiting tasks queue."""
        logging.debug("Processing received message: " + str(message))
        self._add_tasks(message)

    async def _send_handler(self):
        """Sends replies from reply queue.

        Waits 0.5s if reply could not be sent. Waits 0.05s if no replies are available.

        Post: `self.stop` equals True
        """
        while not self.stop:
            try:
                async for reply in self._replies_to_send():
                    await self._websocket.send(reply)
                    logging.debug("Reply sent: " + str(reply))
            except StopAsyncIteration as e1:
                logging.debug(e1)
                logging.debug("No replies available.")
            except RuntimeError as e2:
                logging.debug(e2)
                logging.debug("Can't handle StopAsyncIteration.")
                await asyncio.sleep(0.05)
            except Exception as e:
                logging.debug("Reply not sent.")
                logging.exception(e)
                await asyncio.sleep(0.5)

    async def _handler(self):
        """Lets sender and receiver handlers work asynchronously."""
        receive_task = asyncio.ensure_future(
            self._receive_handler())
        send_task = asyncio.ensure_future(
            self._send_handler())
        done, pending = await asyncio.wait(
            [receive_task, send_task],
            return_when=asyncio.FIRST_COMPLETED,
        )

        for task in pending:
            task.cancel()
            logging.debug("Task cancelled: " + str(task))

    async def _communicate_with_server(self):
        """Keeps websocket connection running.

        Tries to connect to the websocket server with a timeout equal to `self._connection_timeout`. When the connection
        has been established a *Connection established* message is sent to the host.
        If `websockets.exceptions.InvalidMessage` is raised for the first time, the method waits 1.5s asynchronously and
        retries once afterwards. `self.stop` is set to True when `websockets.exceptions.InvalidMessage` occurs a
        second time or when another Exception is raised.

        Returns: None when `self.stop` equals True.

        Post: Raised exceptions in the `self._exception_queue`.
        """
        second_chance = False
        while not self.stop:
            try:
                if self._websocket is None or not self._websocket.open:
                    logging.debug("Websocket NOT connected. Trying to connect. " + str(self._connection_timeout) +
                                  "s timeout set.")
                    await asyncio.wait_for(self._connect_to_server(), self._connection_timeout)
                    # Send initialisation message to wake host.
                    await self._websocket.send(json.dumps({"msg": "Connection established."}))
                logging.debug("Connection established.")
                await self._handler()
            except websockets.exceptions.InvalidMessage as e:
                # Server responds, but not correctly: retry once
                logging.exception(e)
                if second_chance:
                    loggin.debug("Second attempt to connect to active host failed.")
                    self._exception_queue.put(e)
                    self.stop = True
                else:
                    logging.debug("Retry once to connect after invalid response message from host. Wait 1.5s.")
                    await asyncio.sleep(1.5)
                second_chance = True
            except Exception as e:
                logging.exception(e)
                self._exception_queue.put(e)
                self.stop = True
        if self.stop and self._websocket is not None and self._websocket.open:
            logging.debug("Closing websocket.")
            await self._websocket.close()
        logging.debug("Communication with server ended.")
        return

    async def _connect_to_server(self):
        """Creates a websocket connection using the uri of this websocket thread.

        Raises: - OSError when something went wrong, e.g. too many attempts to connect to the websocket host occurred.
                - InvalidURI - passed on from `websockets.client.connect` if `self._websocket_uri` is invalid.
                - InvalidHandshake - passed on from `websockets.client.connect` if the opening handshake fails.
        """
        self._websocket = await websockets.client.connect(self._websocket_uri, extra_headers=self._headers,
                                                          ping_interval=None)

Classes

class WebsocketThread (websocket_uri, exception_queue, add_tasks, reply_queue, loop, authorization, connection_timeout)

Initiates a thread to run a websocket connection, send messages from a reply_queue and receive tasks, saving them in a tasks_queue.

Added in version: 0.2.0

Changed in version: 1.1.0

Attributes

stop
A boolean controlling the running state of this thread. When stop is set to True, running tasks are interrupted, the websocket is closed if it was open and the run() method returns. When stop is set to True by a method of this thread, an exception is added to the exception_queue provided at initialisation.

Args

websocket_uri
A string containing the uri of the websocket host with which a connection should be made.
exception_queue
A reference to a queue in which raised exceptions should be saved to be passed on to the caller of this thread.
add_tasks
An instance method of the calling class that handles received messages.
reply_queue
A queue in which the messages to be sent can be found.
loop
The loop to be set as the asynchronous event loop of this thread.
authorization
The value to be set as the Authorization header value to be sent on websocket connection set up.
connection_timeout
The timeout to be set when connecting to the websocket host.
Expand source code
class WebsocketThread(threading.Thread):
    """Initiates a thread to run a websocket connection, send messages from a `reply_queue` and receive tasks, saving
        them in a `tasks_queue`.

    .. versionadded::0.2.0
    .. versionchanged::1.1.0

    Attributes:
        stop: A boolean controlling the running state of this thread. When stop is set to True, running tasks are
            interrupted, the websocket is closed if it was open and the `run()` method returns.
            When stop is set to True by a method of this thread, an exception is added to the `exception_queue` provided
            at initialisation.
    """

    __version__ = '1.1.0'

    def __init__(self, websocket_uri: str, exception_queue: queue.Queue, add_tasks,
                 reply_queue: collections.deque, loop, authorization, connection_timeout: float):
        """
        Args:
            websocket_uri: A string containing the uri of the websocket host with which a connection should be made.
            exception_queue: A reference to a queue in which raised exceptions should be saved to be passed on to the
                caller of this thread.
            add_tasks: An instance method of the calling class that handles received messages.
            reply_queue: A queue in which the messages to be sent can be found.
            loop: The loop to be set as the asynchronous event loop of this thread.
            authorization: The value to be set as the `Authorization` header value to be sent on websocket connection
                set up.
            connection_timeout: The timeout to be set when connecting to the websocket host.
        """
        threading.Thread.__init__(self)
        self._exception_queue = exception_queue  # reference to queue to store raised exceptions to sync with caller
        self._websocket_uri = websocket_uri  # the uri of the websocket with which a connection should be made
        self._add_tasks = add_tasks  # an instance method of the calling class that adds new tasks to its collection.
        self._reply_queue = reply_queue  # reference to the waiting replies queue
        self._connection_timeout = connection_timeout   # the timeout set for a websocket connection to be established
        self._websocket = None  # variable to reference the websocket connection
        self.stop = False  # variable controlling whether this thread should keep running or should return
        self._loop = loop  # reference to the current loop in which async methods should be called
        asyncio.set_event_loop(loop)

        self._authorization = authorization  # authorization value to authorize to the server
        self._headers = {"Authorization": self._authorization}

    def run(self):
        """Starts communication with the websocket host."""
        self._loop.run_until_complete(self._communicate_with_server())
        logging.debug("Thread " + self.getName() + " has stopped.")

    async def _replies_to_send(self):
        """Generates next reply to be sent and removes it from the reply queue."""
        try:
            yield self._reply_queue.popleft()
        except IndexError:
            raise StopAsyncIteration()

    async def _receive_handler(self):
        """Checks for new messages from server and processes them.

        Sets self.stop to True when the websocket raises a `ConnectionClosedError`.

        Returns: None if the websocket raises a `ConnectionClosedError`.

        Post: `self.stop` equals True.
        """
        while not self.stop:
            try:
                async for message in self._websocket:
                    await self._process_received_message(message)
            except websockets.exceptions.ConnectionClosedError as ex:
                logging.debug(ex)
                self._exception_queue.put(ex)  # pass exception to caller of this thread
                self.stop = True  # return method and stop this thread
            except Exception as e:
                logging.debug(e)

    async def _process_received_message(self, message):
        """Adds valid received tasks to waiting tasks queue."""
        logging.debug("Processing received message: " + str(message))
        self._add_tasks(message)

    async def _send_handler(self):
        """Sends replies from reply queue.

        Waits 0.5s if reply could not be sent. Waits 0.05s if no replies are available.

        Post: `self.stop` equals True
        """
        while not self.stop:
            try:
                async for reply in self._replies_to_send():
                    await self._websocket.send(reply)
                    logging.debug("Reply sent: " + str(reply))
            except StopAsyncIteration as e1:
                logging.debug(e1)
                logging.debug("No replies available.")
            except RuntimeError as e2:
                logging.debug(e2)
                logging.debug("Can't handle StopAsyncIteration.")
                await asyncio.sleep(0.05)
            except Exception as e:
                logging.debug("Reply not sent.")
                logging.exception(e)
                await asyncio.sleep(0.5)

    async def _handler(self):
        """Lets sender and receiver handlers work asynchronously."""
        receive_task = asyncio.ensure_future(
            self._receive_handler())
        send_task = asyncio.ensure_future(
            self._send_handler())
        done, pending = await asyncio.wait(
            [receive_task, send_task],
            return_when=asyncio.FIRST_COMPLETED,
        )

        for task in pending:
            task.cancel()
            logging.debug("Task cancelled: " + str(task))

    async def _communicate_with_server(self):
        """Keeps websocket connection running.

        Tries to connect to the websocket server with a timeout equal to `self._connection_timeout`. When the connection
        has been established a *Connection established* message is sent to the host.
        If `websockets.exceptions.InvalidMessage` is raised for the first time, the method waits 1.5s asynchronously and
        retries once afterwards. `self.stop` is set to True when `websockets.exceptions.InvalidMessage` occurs a
        second time or when another Exception is raised.

        Returns: None when `self.stop` equals True.

        Post: Raised exceptions in the `self._exception_queue`.
        """
        second_chance = False
        while not self.stop:
            try:
                if self._websocket is None or not self._websocket.open:
                    logging.debug("Websocket NOT connected. Trying to connect. " + str(self._connection_timeout) +
                                  "s timeout set.")
                    await asyncio.wait_for(self._connect_to_server(), self._connection_timeout)
                    # Send initialisation message to wake host.
                    await self._websocket.send(json.dumps({"msg": "Connection established."}))
                logging.debug("Connection established.")
                await self._handler()
            except websockets.exceptions.InvalidMessage as e:
                # Server responds, but not correctly: retry once
                logging.exception(e)
                if second_chance:
                    loggin.debug("Second attempt to connect to active host failed.")
                    self._exception_queue.put(e)
                    self.stop = True
                else:
                    logging.debug("Retry once to connect after invalid response message from host. Wait 1.5s.")
                    await asyncio.sleep(1.5)
                second_chance = True
            except Exception as e:
                logging.exception(e)
                self._exception_queue.put(e)
                self.stop = True
        if self.stop and self._websocket is not None and self._websocket.open:
            logging.debug("Closing websocket.")
            await self._websocket.close()
        logging.debug("Communication with server ended.")
        return

    async def _connect_to_server(self):
        """Creates a websocket connection using the uri of this websocket thread.

        Raises: - OSError when something went wrong, e.g. too many attempts to connect to the websocket host occurred.
                - InvalidURI - passed on from `websockets.client.connect` if `self._websocket_uri` is invalid.
                - InvalidHandshake - passed on from `websockets.client.connect` if the opening handshake fails.
        """
        self._websocket = await websockets.client.connect(self._websocket_uri, extra_headers=self._headers,
                                                          ping_interval=None)

Ancestors

  • threading.Thread

Methods

def run(self)

Starts communication with the websocket host.

Expand source code
def run(self):
    """Starts communication with the websocket host."""
    self._loop.run_until_complete(self._communicate_with_server())
    logging.debug("Thread " + self.getName() + " has stopped.")