From a6a1ddbed1d490a5cf73c315c972ffabe94d8cee Mon Sep 17 00:00:00 2001 From: Chronister Date: Sat, 12 Aug 2017 22:21:37 -0700 Subject: Add async parameter to streaming API calls. If true, calls the streaming API on a separate thread and returns the Response object to the user so they can close it at their discretion. --- mastodon/Mastodon.py | 90 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 26 deletions(-) (limited to 'mastodon') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8aa741a..819d252 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -16,6 +16,7 @@ import dateutil import dateutil.parser import re import copy +import threading class Mastodon: """ @@ -818,46 +819,63 @@ class Mastodon: ### # Streaming ### - def user_stream(self, listener): + def user_stream(self, listener, async=False): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. 'listener' should be a subclass of - StreamListener. + StreamListener which will receive callbacks for incoming events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/user', listener) + return self.__stream('/api/v1/streaming/user', listener, async=async) - def public_stream(self, listener): + def public_stream(self, listener, async=False): """ - Streams public events. 'listener' should be a subclass of - StreamListener. + Streams public events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. + + If async is False, this method blocks forever. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/public', listener) + return self.__stream('/api/v1/streaming/public', listener, async=async) - def local_stream(self, listener): + def local_stream(self, listener, async=False): """ - Streams local events. 'listener' should be a subclass of - StreamListener. + Streams local events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. + + If async is False, this method blocks forever. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/public/local', listener) + return self.__stream('/api/v1/streaming/public/local', listener, async=async) - def hashtag_stream(self, tag, listener): + def hashtag_stream(self, tag, listener, async=False): """ Returns all public statuses for the hashtag 'tag'. 'listener' should be - a subclass of StreamListener. + a subclass of StreamListener which will receive callbacks for incoming + events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) + return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async) ### # Internal helpers, dragons probably @@ -1017,19 +1035,39 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params = {}): + + def __stream(self, endpoint, listener, params = {}, async=False): """ Internal streaming API helper. + + Returns the requests.Response instance corresponding to the open websocket + connection. """ headers = {} if self.access_token != None: headers = {'Authorization': 'Bearer ' + self.access_token} - url = self.api_base_url + endpoint - with closing(requests.get(url, headers = headers, data = params, stream = True)) as r: - listener.handle_stream(r.iter_lines()) + connection = requests.get(url, headers = headers, data = params, stream = True) + + def __stream_threadproc(): + with closing(connection) as r: + try: + listener.handle_stream(r.iter_lines()) + except AttributeError as e: + # TODO If the user closes the connection early, requests gets + # confused and throws an AttributeError + pass + return 0 + + if async: + t = threading.Thread(args=(), target=__stream_threadproc) + t.start() + return connection + else: + # Blocking, never returns (can only leave via exception) + return __stream_threadproc() def __generate_params(self, params, exclude = []): """ -- cgit v1.2.3