Module cluster.connector

Expand source code
import time
import json
import queue
import collections
from . import websocket_thread
import asyncio
import logging
# import sys
from enum import Enum
# logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

class Actions(Enum):
    """Enumeration of recognized actions.

    .. versionadded::0.1.0
    .. versionchanged::1.0.0

    The actions that are recognized by the connector and therefore can be returned are enumerated in this class.
    To loop through all of the actions in this enumeration, simply use

        for action in Actions:
               # do something with action

    MATCH_QUESTIONS = "match_questions"
    """Match questions."""

    ESTIMATE_OFFENSIVENESS = "estimate_offensiveness"
    """Estimate the offensiveness of a question."""

    NO_WORK = "no_work"
    """There server has no tasks to process."""

    IS_NONSENSE = "is_nonsense"
    """Find out if a string contains nonsense
    .. versionadded::1.0.0

    def has_value(cls, value):
        return value in cls._value2member_map_

class Connector(object):
    """Allows communication with Cluster API server.

    This Connector class allows communication with the Cluster API server by returning NLP tasks
    from the server whenever any are available and by replying with a response.

    .. versionadded::0.1.0
    .. versionchanged::0.2.0
    .. versionchanged::1.0.0
    .. versionchanged::1.1.0

        Exception: Something went wrong while trying to communicate with the server. The range of these exceptions is
            mostly focused on `OSError` and `websockets.exceptions.InvalidMessage`, but is not limited to those.

        To enable logging of debugging messages, use the following statements:
            >> import logging, sys
            >> logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

    necessary_task_keys = {"msg_id", "action"}
    """Set of keys that have to be in a task dictionary to be a valid task.
    .. versionadded::0.2.0

    _generic_actions = {Actions.ESTIMATE_OFFENSIVENESS, Actions.IS_NONSENSE}
    """Set of actions that can be applied on both questions and answers.
    .. versionadded::1.0.0

    __version__ = '1.1.0'

    def __init__(self, websocket_uri="wss://",
                 websocket_connection_timeout=10, authorization="843iu233d3m4pxb1"):
            websocket_uri: A custom uri referencing the websocket host that should be used.

            websocket_connection_timeout: The timeout to be set for the websocket connection before giving up. By
                default set to 10 seconds.

            authorization: The authorization to be used to get access to the websocket host.
        self._tasks = list()  # store non processed received tasks
        self._tasks_in_progress = dict()  # keep track of work in progress

        self._websocket_connection_timeout = websocket_connection_timeout
        self._websocket_uri = websocket_uri
        self._authorization = authorization
        self._reply_queue = collections.deque()  # keep list of replies to send
        self._websocket_thread = None
        self._websocket_exceptions = queue.Queue()  # queue to keep exceptions thrown by websocket thread

    def reset_connection(self):
        """Resets the websocket thread.

        .. versionadded::0.2.0

    def _init_websocket_thread(self):
        """Initialize a new thread running a websocket connection.

            In case a websocket thread had been assigned before, the previous websocket thread is stopped and a new
            websocket thread is started.
            `self._websocket_thread` equals the newly assigned websocket thread.
        if self._websocket_thread is not None:
            self._websocket_thread.stop = True
        # Clear exceptions in case any are still in the queue
        logging.debug("Clearing exception queue.")
        with self._websocket_exceptions.mutex:
        # Let asynchronous websocket run in separate thread, so it doesn't block
        logging.debug("Starting new thread.")
        self._websocket_thread = websocket_thread.WebsocketThread(self._websocket_uri, self._websocket_exceptions,
                                                                  self._reply_queue, asyncio.get_event_loop(),
        logging.debug("Thread " + self._websocket_thread.getName() + " started.")

    def _checkout_websocket(self):
        """Checks whether the websocket thread is still alive and whether it has passed exceptions.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        # check if websocket still alive and hasn't thrown any exceptions
        if not self._websocket_exceptions.empty():
            # Websocket thread passed an exception.
            exception = self._websocket_exceptions.get()
            self._websocket_thread.stop = True
            logging.debug("An exception occurred in the websocket thread.")
            raise exception
        elif self._websocket_thread is None or not self._websocket_thread.is_alive():
            logging.debug("Reinitializing websocket thread.")

    def _add_tasks(self, message):
        """Parses a given response and adds tasks from message to the queue if needed."""
        received_tasks = Connector._parse_response(message)
        for task in received_tasks:

    def _add_task(self, task):
        """Adds the given task to the task queue if it is valid and not yet in the task or tasks in progress queue."""
        if not Connector.is_valid_task(task):
            logging.debug("Task with invalid structure received: " + str(task))
        elif task not in self._tasks and task['msg_id'] not in self._tasks_in_progress:
            # only add task if valid and not in the (progress) task list already
            logging.debug("Task added: " + str(task))
            # task already received
            logging.debug("Message id " + str(task['msg_id']) + " already in task or tasks in progress queue.")

    def has_task(self) -> bool:
        """Checks whether the server has any tasks available.

        .. versionadded::0.1.0
        .. versionchanged::0.2.0
        .. versionchanged::1.0.0

        Checks whether the web socket connection is still alive and whether any tasks are available in the cache.

            True if and only if there is a task to be processed.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        return len(self._tasks) > 0

    def get_next_task(self, timeout: float = None) -> any:
        Waits for the next task from the server and returns it as a dictionary.

        Waits until the server has delivered a task or until timeout if a timeout is set.

        .. versionadded::0.1.0
        .. versionchanged::0.2.0
        .. versionchanged::1.0.0

            timeout: The number of seconds to wait before returning without result. In case the timeout is set to None,
                then the method will only return upon receiving a task from the server.

        Currently two possible JSON structures can be expected:

        1. The server asks to match a question with an undefined number of questions:

                    "action": Actions.MATCH_QUESTIONS,
                    "question_id": 123,
                    "question": "XXX",
                    "compare_questions": [
                            "question_id": 111,
                            "question": "AAA"
                            "question_id": 222,
                            "question": "BBB"
                            "question_id": 333,
                            "question": "CCC"
                    "msg_id": 1234567890

        2. The server asks to estimate the offensiveness of a sentence:

                    "action": Actions.ESTIMATE_OFFENSIVENESS,
                    "sentence_id": 100,
                    "sentence": "XXX",
                    "msg_id": 1234567890

        3. The server asks to check if a sentence is nonsense or not:

                    "action": Actions.IS_NONSENSE,
                    "sentence_id": 100,
                    "sentence": "XXX",
                    "msg_id": 1234567890

        Note that other keys can be present, but the keys mentioned in the example will be part of the actual result.

             A task to be processed as a JSON object or None when no task was received before timeout.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        logging.debug("Get task using websocket")
        tasks_found = len(self._tasks) > 0
        start_time = time.time()
        time_passed = 0
        while not tasks_found and (timeout is None or (time_passed < timeout)):
            tasks_found = len(self._tasks) > 0
            time_passed = time.time() - start_time  # keep track of the passed time
        if tasks_found:
            # Remove task from task list and add it to the tasks in progress list.
            task = self._tasks.pop(0)
            self._tasks_in_progress[task['msg_id']] = task
            task = None
        return task

    def close(self):
        """Sends a stop signal to the thread running the websocket connection of this connector.

        .. versionadded::0.2.0
        self._websocket_thread.stop = True

    def is_valid_task(cls, task: dict):
        """Returns True if and only if the given dictionary contains the keys that are in the `cls.necessary_task_keys`

        .. versionadded::0.2.0
        return set(task.keys()).intersection(cls.necessary_task_keys) == cls.necessary_task_keys

    def _parse_response(cls, response) -> list:
        """Processes a dictionary or a list of dictionaries received from the server and returns a list of dictionaries
         that comply to the structure of the result of `get_next_task()`.

            response: The response from the server as a dictionary or a list of dictionaries.

            A list of dictionaries that comply to the structure of the result of `get_next_task()` containing the
            information of the given `response` as far as the structure allows it.
        parsed_response = list()
            response = json.loads(response)
        except json.decoder.JSONDecodeError as e:
            response = ""
        if type(response) == list:
            for task in response:
                task = cls._parse_response_dict(task)
        elif type(response) == dict:
            task = cls._parse_response_dict(response)
        return parsed_response

    def _parse_response_dict(cls, response_dict: dict) -> dict:
        """Converts keys of given dictionary and dictionaries in a list in the given dictionary to lower case.
        Also adds sentence and sentence_id keys to replace question/answer and question_id/answer_id.
        parsed_response = dict()
        for key, value in response_dict.items():
            if type(value) == list:
                new_value = list()
                for item in value:
                    if type(item) == dict:
                        item = {k.lower(): v for k, v in item.items()}  # deepest expected nesting is this level
                value = new_value
            key = key.lower()
            parsed_response[key] = value
        if parsed_response["action"] in cls._generic_actions:
            # add generic keys sentence en sentence_id instead of answer/question
            if "question" in parsed_response.keys():
                parsed_response["sentence"] = parsed_response["question"]
                parsed_response["sentence_id"] = parsed_response["question_id"]
            elif "answer" in parsed_response.keys():
                parsed_response["sentence"] = parsed_response["answer"]
                parsed_response["sentence_id"] = parsed_response["answer_id"]
        return parsed_response

    def _parse_request(self, request: dict) -> dict:
        """Processes a dictionary received from the NLP and returns a dictionary that complies to
        structure that can be understood by the server.

            request: The request from the NLP as a dictionary.

            A dictionary that complies to the structure understood by the server containing the
            information of the given `request` as far as the structure allows it.
        parsed_request = request
        # return from generic sentence(_id) to question/answer(_id)
        original_response = self._tasks_in_progress[request["msg_id"]]
        if "question" in original_response.keys():
            parsed_request["question"] = request["sentence"]
            parsed_request["question_id"] = request["sentence_id"]
        elif "answer" in original_response.keys():
            parsed_request["answer"] = request["sentence"]
            parsed_request["answer_id"] = request["sentence_id"]
        return parsed_request

    def reply(self, response: dict):
        """Sends the given response to the server.

        .. versionadded::0.1.0
        .. versionchanged::1.0.0

        Checks whether the websocket connection is still alive and delivers the given `response` to the websocket

            response: A dictionary built like a JSON object.

            The effect of replying with a response that doesn't follow one of the below mentioned structures
            is undefined. As a response argument, currently two possible structures are allowed:

            1. A reply to a `match_question` containing a top x of comparable questions:

                        "question_id": 123,
                        "possible_matches": [
                                "question_id": 111,
                                "prob": 0.789
                                "question_id": 333,
                                "prob": 0.654
                        "msg_id": 1234567890

            2. A reply to an `estimate_offensiveness`:

                        "sentence_id": 100,
                        "prob": 0.123,
                        "msg_id": 1234567890

            3. A reply to an `is_nonsense`:

                        "sentence_id": 100,
                        "nonsense": True,
                        "msg_id": 1234567890

            The `msg_id` is always used to include in the reply so that the server knows to
            which task the reply belongs. It corresponds to the `msg_id` from a task from
            the `get_next_task()` method.

            Exception: Something went wrong while sending the reply to the server.
                This exception may become more specific in a future release, but for now it is kept as general as
                possible, so any implementation changes don't effect these specifications.
        action = self._tasks_in_progress[response['msg_id']]['action'].lower()
        if Actions.has_value(action) and response['msg_id'] in self._tasks_in_progress.keys():
            data = self._parse_request(response)
            del self._tasks_in_progress[response['msg_id']]


class Actions (*args, **kwargs)

Enumeration of recognized actions.

Added in version: 0.1.0

Changed in version: 1.0.0

The actions that are recognized by the connector and therefore can be returned are enumerated in this class. To loop through all of the actions in this enumeration, simply use

for action in Actions:
       # do something with action
Expand source code
class Actions(Enum):
    """Enumeration of recognized actions.

    .. versionadded::0.1.0
    .. versionchanged::1.0.0

    The actions that are recognized by the connector and therefore can be returned are enumerated in this class.
    To loop through all of the actions in this enumeration, simply use

        for action in Actions:
               # do something with action

    MATCH_QUESTIONS = "match_questions"
    """Match questions."""

    ESTIMATE_OFFENSIVENESS = "estimate_offensiveness"
    """Estimate the offensiveness of a question."""

    NO_WORK = "no_work"
    """There server has no tasks to process."""

    IS_NONSENSE = "is_nonsense"
    """Find out if a string contains nonsense
    .. versionadded::1.0.0

    def has_value(cls, value):
        return value in cls._value2member_map_


  • enum.Enum

Class variables


Estimate the offensiveness of a question.


Find out if a string contains nonsense

Added in version: 1.0.0


Match questions.


There server has no tasks to process.

class Connector (websocket_uri='wss://', websocket_connection_timeout=10, authorization='843iu233d3m4pxb1')

Allows communication with Cluster API server.

This Connector class allows communication with the Cluster API server by returning NLP tasks from the server whenever any are available and by replying with a response.

Added in version: 0.1.0

Changed in version: 0.2.0

Changed in version: 1.0.0

Changed in version: 1.1.0


Something went wrong while trying to communicate with the server. The range of these exceptions is mostly focused on OSError and websockets.exceptions.InvalidMessage, but is not limited to those.


To enable logging of debugging messages, use the following statements:

    >> import logging, sys
    >> logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


A custom uri referencing the websocket host that should be used.
The timeout to be set for the websocket connection before giving up. By default set to 10 seconds.
The authorization to be used to get access to the websocket host.
Expand source code
class Connector(object):
    """Allows communication with Cluster API server.

    This Connector class allows communication with the Cluster API server by returning NLP tasks
    from the server whenever any are available and by replying with a response.

    .. versionadded::0.1.0
    .. versionchanged::0.2.0
    .. versionchanged::1.0.0
    .. versionchanged::1.1.0

        Exception: Something went wrong while trying to communicate with the server. The range of these exceptions is
            mostly focused on `OSError` and `websockets.exceptions.InvalidMessage`, but is not limited to those.

        To enable logging of debugging messages, use the following statements:
            >> import logging, sys
            >> logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

    necessary_task_keys = {"msg_id", "action"}
    """Set of keys that have to be in a task dictionary to be a valid task.
    .. versionadded::0.2.0

    _generic_actions = {Actions.ESTIMATE_OFFENSIVENESS, Actions.IS_NONSENSE}
    """Set of actions that can be applied on both questions and answers.
    .. versionadded::1.0.0

    __version__ = '1.1.0'

    def __init__(self, websocket_uri="wss://",
                 websocket_connection_timeout=10, authorization="843iu233d3m4pxb1"):
            websocket_uri: A custom uri referencing the websocket host that should be used.

            websocket_connection_timeout: The timeout to be set for the websocket connection before giving up. By
                default set to 10 seconds.

            authorization: The authorization to be used to get access to the websocket host.
        self._tasks = list()  # store non processed received tasks
        self._tasks_in_progress = dict()  # keep track of work in progress

        self._websocket_connection_timeout = websocket_connection_timeout
        self._websocket_uri = websocket_uri
        self._authorization = authorization
        self._reply_queue = collections.deque()  # keep list of replies to send
        self._websocket_thread = None
        self._websocket_exceptions = queue.Queue()  # queue to keep exceptions thrown by websocket thread

    def reset_connection(self):
        """Resets the websocket thread.

        .. versionadded::0.2.0

    def _init_websocket_thread(self):
        """Initialize a new thread running a websocket connection.

            In case a websocket thread had been assigned before, the previous websocket thread is stopped and a new
            websocket thread is started.
            `self._websocket_thread` equals the newly assigned websocket thread.
        if self._websocket_thread is not None:
            self._websocket_thread.stop = True
        # Clear exceptions in case any are still in the queue
        logging.debug("Clearing exception queue.")
        with self._websocket_exceptions.mutex:
        # Let asynchronous websocket run in separate thread, so it doesn't block
        logging.debug("Starting new thread.")
        self._websocket_thread = websocket_thread.WebsocketThread(self._websocket_uri, self._websocket_exceptions,
                                                                  self._reply_queue, asyncio.get_event_loop(),
        logging.debug("Thread " + self._websocket_thread.getName() + " started.")

    def _checkout_websocket(self):
        """Checks whether the websocket thread is still alive and whether it has passed exceptions.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        # check if websocket still alive and hasn't thrown any exceptions
        if not self._websocket_exceptions.empty():
            # Websocket thread passed an exception.
            exception = self._websocket_exceptions.get()
            self._websocket_thread.stop = True
            logging.debug("An exception occurred in the websocket thread.")
            raise exception
        elif self._websocket_thread is None or not self._websocket_thread.is_alive():
            logging.debug("Reinitializing websocket thread.")

    def _add_tasks(self, message):
        """Parses a given response and adds tasks from message to the queue if needed."""
        received_tasks = Connector._parse_response(message)
        for task in received_tasks:

    def _add_task(self, task):
        """Adds the given task to the task queue if it is valid and not yet in the task or tasks in progress queue."""
        if not Connector.is_valid_task(task):
            logging.debug("Task with invalid structure received: " + str(task))
        elif task not in self._tasks and task['msg_id'] not in self._tasks_in_progress:
            # only add task if valid and not in the (progress) task list already
            logging.debug("Task added: " + str(task))
            # task already received
            logging.debug("Message id " + str(task['msg_id']) + " already in task or tasks in progress queue.")

    def has_task(self) -> bool:
        """Checks whether the server has any tasks available.

        .. versionadded::0.1.0
        .. versionchanged::0.2.0
        .. versionchanged::1.0.0

        Checks whether the web socket connection is still alive and whether any tasks are available in the cache.

            True if and only if there is a task to be processed.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        return len(self._tasks) > 0

    def get_next_task(self, timeout: float = None) -> any:
        Waits for the next task from the server and returns it as a dictionary.

        Waits until the server has delivered a task or until timeout if a timeout is set.

        .. versionadded::0.1.0
        .. versionchanged::0.2.0
        .. versionchanged::1.0.0

            timeout: The number of seconds to wait before returning without result. In case the timeout is set to None,
                then the method will only return upon receiving a task from the server.

        Currently two possible JSON structures can be expected:

        1. The server asks to match a question with an undefined number of questions:

                    "action": Actions.MATCH_QUESTIONS,
                    "question_id": 123,
                    "question": "XXX",
                    "compare_questions": [
                            "question_id": 111,
                            "question": "AAA"
                            "question_id": 222,
                            "question": "BBB"
                            "question_id": 333,
                            "question": "CCC"
                    "msg_id": 1234567890

        2. The server asks to estimate the offensiveness of a sentence:

                    "action": Actions.ESTIMATE_OFFENSIVENESS,
                    "sentence_id": 100,
                    "sentence": "XXX",
                    "msg_id": 1234567890

        3. The server asks to check if a sentence is nonsense or not:

                    "action": Actions.IS_NONSENSE,
                    "sentence_id": 100,
                    "sentence": "XXX",
                    "msg_id": 1234567890

        Note that other keys can be present, but the keys mentioned in the example will be part of the actual result.

             A task to be processed as a JSON object or None when no task was received before timeout.

            Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
        logging.debug("Get task using websocket")
        tasks_found = len(self._tasks) > 0
        start_time = time.time()
        time_passed = 0
        while not tasks_found and (timeout is None or (time_passed < timeout)):
            tasks_found = len(self._tasks) > 0
            time_passed = time.time() - start_time  # keep track of the passed time
        if tasks_found:
            # Remove task from task list and add it to the tasks in progress list.
            task = self._tasks.pop(0)
            self._tasks_in_progress[task['msg_id']] = task
            task = None
        return task

    def close(self):
        """Sends a stop signal to the thread running the websocket connection of this connector.

        .. versionadded::0.2.0
        self._websocket_thread.stop = True

    def is_valid_task(cls, task: dict):
        """Returns True if and only if the given dictionary contains the keys that are in the `cls.necessary_task_keys`

        .. versionadded::0.2.0
        return set(task.keys()).intersection(cls.necessary_task_keys) == cls.necessary_task_keys

    def _parse_response(cls, response) -> list:
        """Processes a dictionary or a list of dictionaries received from the server and returns a list of dictionaries
         that comply to the structure of the result of `get_next_task()`.

            response: The response from the server as a dictionary or a list of dictionaries.

            A list of dictionaries that comply to the structure of the result of `get_next_task()` containing the
            information of the given `response` as far as the structure allows it.
        parsed_response = list()
            response = json.loads(response)
        except json.decoder.JSONDecodeError as e:
            response = ""
        if type(response) == list:
            for task in response:
                task = cls._parse_response_dict(task)
        elif type(response) == dict:
            task = cls._parse_response_dict(response)
        return parsed_response

    def _parse_response_dict(cls, response_dict: dict) -> dict:
        """Converts keys of given dictionary and dictionaries in a list in the given dictionary to lower case.
        Also adds sentence and sentence_id keys to replace question/answer and question_id/answer_id.
        parsed_response = dict()
        for key, value in response_dict.items():
            if type(value) == list:
                new_value = list()
                for item in value:
                    if type(item) == dict:
                        item = {k.lower(): v for k, v in item.items()}  # deepest expected nesting is this level
                value = new_value
            key = key.lower()
            parsed_response[key] = value
        if parsed_response["action"] in cls._generic_actions:
            # add generic keys sentence en sentence_id instead of answer/question
            if "question" in parsed_response.keys():
                parsed_response["sentence"] = parsed_response["question"]
                parsed_response["sentence_id"] = parsed_response["question_id"]
            elif "answer" in parsed_response.keys():
                parsed_response["sentence"] = parsed_response["answer"]
                parsed_response["sentence_id"] = parsed_response["answer_id"]
        return parsed_response

    def _parse_request(self, request: dict) -> dict:
        """Processes a dictionary received from the NLP and returns a dictionary that complies to
        structure that can be understood by the server.

            request: The request from the NLP as a dictionary.

            A dictionary that complies to the structure understood by the server containing the
            information of the given `request` as far as the structure allows it.
        parsed_request = request
        # return from generic sentence(_id) to question/answer(_id)
        original_response = self._tasks_in_progress[request["msg_id"]]
        if "question" in original_response.keys():
            parsed_request["question"] = request["sentence"]
            parsed_request["question_id"] = request["sentence_id"]
        elif "answer" in original_response.keys():
            parsed_request["answer"] = request["sentence"]
            parsed_request["answer_id"] = request["sentence_id"]
        return parsed_request

    def reply(self, response: dict):
        """Sends the given response to the server.

        .. versionadded::0.1.0
        .. versionchanged::1.0.0

        Checks whether the websocket connection is still alive and delivers the given `response` to the websocket

            response: A dictionary built like a JSON object.

            The effect of replying with a response that doesn't follow one of the below mentioned structures
            is undefined. As a response argument, currently two possible structures are allowed:

            1. A reply to a `match_question` containing a top x of comparable questions:

                        "question_id": 123,
                        "possible_matches": [
                                "question_id": 111,
                                "prob": 0.789
                                "question_id": 333,
                                "prob": 0.654
                        "msg_id": 1234567890

            2. A reply to an `estimate_offensiveness`:

                        "sentence_id": 100,
                        "prob": 0.123,
                        "msg_id": 1234567890

            3. A reply to an `is_nonsense`:

                        "sentence_id": 100,
                        "nonsense": True,
                        "msg_id": 1234567890

            The `msg_id` is always used to include in the reply so that the server knows to
            which task the reply belongs. It corresponds to the `msg_id` from a task from
            the `get_next_task()` method.

            Exception: Something went wrong while sending the reply to the server.
                This exception may become more specific in a future release, but for now it is kept as general as
                possible, so any implementation changes don't effect these specifications.
        action = self._tasks_in_progress[response['msg_id']]['action'].lower()
        if Actions.has_value(action) and response['msg_id'] in self._tasks_in_progress.keys():
            data = self._parse_request(response)
            del self._tasks_in_progress[response['msg_id']]

Class variables

var necessary_task_keys

Set of keys that have to be in a task dictionary to be a valid task.

Added in version: 0.2.0

Static methods

def is_valid_task(task)

Returns True if and only if the given dictionary contains the keys that are in the cls.necessary_task_keys set.

Added in version: 0.2.0

Expand source code
def is_valid_task(cls, task: dict):
    """Returns True if and only if the given dictionary contains the keys that are in the `cls.necessary_task_keys`

    .. versionadded::0.2.0
    return set(task.keys()).intersection(cls.necessary_task_keys) == cls.necessary_task_keys


def close(self)

Sends a stop signal to the thread running the websocket connection of this connector.

Added in version: 0.2.0

Expand source code
def close(self):
    """Sends a stop signal to the thread running the websocket connection of this connector.

    .. versionadded::0.2.0
    self._websocket_thread.stop = True
def get_next_task(self, timeout=None)

Waits for the next task from the server and returns it as a dictionary.

Waits until the server has delivered a task or until timeout if a timeout is set.

Added in version: 0.1.0

Changed in version: 0.2.0

Changed in version: 1.0.0


The number of seconds to wait before returning without result. In case the timeout is set to None, then the method will only return upon receiving a task from the server.

Currently two possible JSON structures can be expected:

  1. The server asks to match a question with an undefined number of questions:

        "action": Actions.MATCH_QUESTIONS,
        "question_id": 123,
        "question": "XXX",
        "compare_questions": [
                "question_id": 111,
                "question": "AAA"
                "question_id": 222,
                "question": "BBB"
                "question_id": 333,
                "question": "CCC"
        "msg_id": 1234567890
  2. The server asks to estimate the offensiveness of a sentence:

        "action": Actions.ESTIMATE_OFFENSIVENESS,
        "sentence_id": 100,
        "sentence": "XXX",
        "msg_id": 1234567890
  3. The server asks to check if a sentence is nonsense or not:

        "action": Actions.IS_NONSENSE,
        "sentence_id": 100,
        "sentence": "XXX",
        "msg_id": 1234567890

Note that other keys can be present, but the keys mentioned in the example will be part of the actual result.


A task to be processed as a JSON object or None when no task was received before timeout.


The websocket thread has passed an exception. The passed exception is raised by this method.
Expand source code
def get_next_task(self, timeout: float = None) -> any:
    Waits for the next task from the server and returns it as a dictionary.

    Waits until the server has delivered a task or until timeout if a timeout is set.

    .. versionadded::0.1.0
    .. versionchanged::0.2.0
    .. versionchanged::1.0.0

        timeout: The number of seconds to wait before returning without result. In case the timeout is set to None,
            then the method will only return upon receiving a task from the server.

    Currently two possible JSON structures can be expected:

    1. The server asks to match a question with an undefined number of questions:

                "action": Actions.MATCH_QUESTIONS,
                "question_id": 123,
                "question": "XXX",
                "compare_questions": [
                        "question_id": 111,
                        "question": "AAA"
                        "question_id": 222,
                        "question": "BBB"
                        "question_id": 333,
                        "question": "CCC"
                "msg_id": 1234567890

    2. The server asks to estimate the offensiveness of a sentence:

                "action": Actions.ESTIMATE_OFFENSIVENESS,
                "sentence_id": 100,
                "sentence": "XXX",
                "msg_id": 1234567890

    3. The server asks to check if a sentence is nonsense or not:

                "action": Actions.IS_NONSENSE,
                "sentence_id": 100,
                "sentence": "XXX",
                "msg_id": 1234567890

    Note that other keys can be present, but the keys mentioned in the example will be part of the actual result.

         A task to be processed as a JSON object or None when no task was received before timeout.

        Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
    logging.debug("Get task using websocket")
    tasks_found = len(self._tasks) > 0
    start_time = time.time()
    time_passed = 0
    while not tasks_found and (timeout is None or (time_passed < timeout)):
        tasks_found = len(self._tasks) > 0
        time_passed = time.time() - start_time  # keep track of the passed time
    if tasks_found:
        # Remove task from task list and add it to the tasks in progress list.
        task = self._tasks.pop(0)
        self._tasks_in_progress[task['msg_id']] = task
        task = None
    return task
def has_task(self)

Checks whether the server has any tasks available.

Added in version: 0.1.0

Changed in version: 0.2.0

Changed in version: 1.0.0

Checks whether the web socket connection is still alive and whether any tasks are available in the cache.


True if and only if there is a task to be processed.


The websocket thread has passed an exception. The passed exception is raised by this method.
Expand source code
def has_task(self) -> bool:
    """Checks whether the server has any tasks available.

    .. versionadded::0.1.0
    .. versionchanged::0.2.0
    .. versionchanged::1.0.0

    Checks whether the web socket connection is still alive and whether any tasks are available in the cache.

        True if and only if there is a task to be processed.

        Exception: The websocket thread has passed an exception. The passed exception is raised by this method.
    return len(self._tasks) > 0
def reply(self, response)

Sends the given response to the server.

Added in version: 0.1.0

Changed in version: 1.0.0

Checks whether the websocket connection is still alive and delivers the given response to the websocket thread.


A dictionary built like a JSON object.

The effect of replying with a response that doesn't follow one of the below mentioned structures is undefined. As a response argument, currently two possible structures are allowed:

  1. A reply to a match_question containing a top x of comparable questions:

        "question_id": 123,
        "possible_matches": [
                "question_id": 111,
                "prob": 0.789
                "question_id": 333,
                "prob": 0.654
        "msg_id": 1234567890
  2. A reply to an estimate_offensiveness:

        "sentence_id": 100,
        "prob": 0.123,
        "msg_id": 1234567890
  3. A reply to an is_nonsense:

        "sentence_id": 100,
        "nonsense": True,
        "msg_id": 1234567890

The msg_id is always used to include in the reply so that the server knows to which task the reply belongs. It corresponds to the msg_id from a task from the get_next_task() method.


Something went wrong while sending the reply to the server. This exception may become more specific in a future release, but for now it is kept as general as possible, so any implementation changes don't effect these specifications.
Expand source code
def reply(self, response: dict):
    """Sends the given response to the server.

    .. versionadded::0.1.0
    .. versionchanged::1.0.0

    Checks whether the websocket connection is still alive and delivers the given `response` to the websocket

        response: A dictionary built like a JSON object.

        The effect of replying with a response that doesn't follow one of the below mentioned structures
        is undefined. As a response argument, currently two possible structures are allowed:

        1. A reply to a `match_question` containing a top x of comparable questions:

                    "question_id": 123,
                    "possible_matches": [
                            "question_id": 111,
                            "prob": 0.789
                            "question_id": 333,
                            "prob": 0.654
                    "msg_id": 1234567890

        2. A reply to an `estimate_offensiveness`:

                    "sentence_id": 100,
                    "prob": 0.123,
                    "msg_id": 1234567890

        3. A reply to an `is_nonsense`:

                    "sentence_id": 100,
                    "nonsense": True,
                    "msg_id": 1234567890

        The `msg_id` is always used to include in the reply so that the server knows to
        which task the reply belongs. It corresponds to the `msg_id` from a task from
        the `get_next_task()` method.

        Exception: Something went wrong while sending the reply to the server.
            This exception may become more specific in a future release, but for now it is kept as general as
            possible, so any implementation changes don't effect these specifications.
    action = self._tasks_in_progress[response['msg_id']]['action'].lower()
    if Actions.has_value(action) and response['msg_id'] in self._tasks_in_progress.keys():
        data = self._parse_request(response)
        del self._tasks_in_progress[response['msg_id']]
def reset_connection(self)

Resets the websocket thread.

Added in version: 0.2.0

Expand source code
def reset_connection(self):
    """Resets the websocket thread.

    .. versionadded::0.2.0