From a6a1ddbed1d490a5cf73c315c972ffabe94d8cee Mon Sep 17 00:00:00 2001 From: Chronister Date: Sat, 12 Aug 2017 22:21:37 -0700 Subject: Add async parameter to streaming API calls. If true, calls the streaming API on a separate thread and returns the Response object to the user so they can close it at their discretion. --- mastodon/Mastodon.py | 90 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 26 deletions(-) (limited to 'mastodon') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8aa741a..819d252 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -16,6 +16,7 @@ import dateutil import dateutil.parser import re import copy +import threading class Mastodon: """ @@ -818,46 +819,63 @@ class Mastodon: ### # Streaming ### - def user_stream(self, listener): + def user_stream(self, listener, async=False): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. 'listener' should be a subclass of - StreamListener. + StreamListener which will receive callbacks for incoming events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/user', listener) + return self.__stream('/api/v1/streaming/user', listener, async=async) - def public_stream(self, listener): + def public_stream(self, listener, async=False): """ - Streams public events. 'listener' should be a subclass of - StreamListener. + Streams public events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. + + If async is False, this method blocks forever. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/public', listener) + return self.__stream('/api/v1/streaming/public', listener, async=async) - def local_stream(self, listener): + def local_stream(self, listener, async=False): """ - Streams local events. 'listener' should be a subclass of - StreamListener. + Streams local events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. + + If async is False, this method blocks forever. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/public/local', listener) + return self.__stream('/api/v1/streaming/public/local', listener, async=async) - def hashtag_stream(self, tag, listener): + def hashtag_stream(self, tag, listener, async=False): """ Returns all public statuses for the hashtag 'tag'. 'listener' should be - a subclass of StreamListener. + a subclass of StreamListener which will receive callbacks for incoming + events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a requests.Response instance corresponding to the open + connection. The connection may be closed at any time by calling its + close() method. """ - return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) + return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async) ### # Internal helpers, dragons probably @@ -1017,19 +1035,39 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params = {}): + + def __stream(self, endpoint, listener, params = {}, async=False): """ Internal streaming API helper. + + Returns the requests.Response instance corresponding to the open websocket + connection. """ headers = {} if self.access_token != None: headers = {'Authorization': 'Bearer ' + self.access_token} - url = self.api_base_url + endpoint - with closing(requests.get(url, headers = headers, data = params, stream = True)) as r: - listener.handle_stream(r.iter_lines()) + connection = requests.get(url, headers = headers, data = params, stream = True) + + def __stream_threadproc(): + with closing(connection) as r: + try: + listener.handle_stream(r.iter_lines()) + except AttributeError as e: + # TODO If the user closes the connection early, requests gets + # confused and throws an AttributeError + pass + return 0 + + if async: + t = threading.Thread(args=(), target=__stream_threadproc) + t.start() + return connection + else: + # Blocking, never returns (can only leave via exception) + return __stream_threadproc() def __generate_params(self, params, exclude = []): """ -- cgit v1.2.3 From 4a5302e03a3ecde244153cde2f79d2ae5a31ff94 Mon Sep 17 00:00:00 2001 From: Chronister Date: Sun, 13 Aug 2017 18:10:04 -0700 Subject: Return a one-off handle instead of the Response object --- mastodon/Mastodon.py | 61 ++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 28 deletions(-) (limited to 'mastodon') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 819d252..620b303 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -108,9 +108,9 @@ class Mastodon: self._token_expired = datetime.datetime.now() self._refresh_token = None - self.ratelimit_limit = 300 + self.ratelimit_limit = 150 self.ratelimit_reset = time.time() - self.ratelimit_remaining = 300 + self.ratelimit_remaining = 150 self.ratelimit_lastcall = time.time() self.ratelimit_pacefactor = ratelimit_pacefactor @@ -828,9 +828,8 @@ class Mastodon: If async is False, this method blocks forever. If async is True, 'listener' will listen on another thread and this method - will return a requests.Response instance corresponding to the open - connection. The connection may be closed at any time by calling its - close() method. + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ return self.__stream('/api/v1/streaming/user', listener, async=async) @@ -842,9 +841,8 @@ class Mastodon: If async is False, this method blocks forever. If async is True, 'listener' will listen on another thread and this method - will return a requests.Response instance corresponding to the open - connection. The connection may be closed at any time by calling its - close() method. + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ return self.__stream('/api/v1/streaming/public', listener, async=async) @@ -856,9 +854,8 @@ class Mastodon: If async is False, this method blocks forever. If async is True, 'listener' will listen on another thread and this method - will return a requests.Response instance corresponding to the open - connection. The connection may be closed at any time by calling its - close() method. + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ return self.__stream('/api/v1/streaming/public/local', listener, async=async) @@ -871,9 +868,8 @@ class Mastodon: If async is False, this method blocks forever. If async is True, 'listener' will listen on another thread and this method - will return a requests.Response instance corresponding to the open - connection. The connection may be closed at any time by calling its - close() method. + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async) @@ -1040,8 +1036,8 @@ class Mastodon: """ Internal streaming API helper. - Returns the requests.Response instance corresponding to the open websocket - connection. + Returns a handle to the open connection that the user can close if they + wish to terminate it. """ headers = {} @@ -1051,23 +1047,32 @@ class Mastodon: connection = requests.get(url, headers = headers, data = params, stream = True) - def __stream_threadproc(): - with closing(connection) as r: - try: - listener.handle_stream(r.iter_lines()) - except AttributeError as e: - # TODO If the user closes the connection early, requests gets - # confused and throws an AttributeError - pass - return 0 + class __stream_handle(): + def __init__(self, connection): + self.connection = connection + + def close(self): + self.connection.close() + + def _threadproc(self): + with closing(connection) as r: + try: + listener.handle_stream(r.iter_lines()) + except AttributeError as e: + # Eat AttributeError from requests if user closes early + pass + return 0 + + handle = __stream_handle(connection) if async: - t = threading.Thread(args=(), target=__stream_threadproc) + t = threading.Thread(args=(), target=handle._threadproc) t.start() - return connection + return handle else: # Blocking, never returns (can only leave via exception) - return __stream_threadproc() + with closing(connection) as r: + listener.handle_stream(r.iter_lines()) def __generate_params(self, params, exclude = []): """ -- cgit v1.2.3