From d0ae9dcd055e3bdc96a5ab817d14cda012516297 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:35:09 +0200 Subject: Add async autoreconnect --- mastodon/Mastodon.py | 87 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 28 deletions(-) (limited to 'mastodon/Mastodon.py') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8b7064f..933a6c5 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1387,45 +1387,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False): + def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, async=async) + return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False): + def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, async=async) + return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False): + def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, async=async) + return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False): + def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False): + def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1667,7 +1667,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, async=False): + def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Internal streaming API helper. @@ -1695,18 +1695,27 @@ class Mastodon: # The streaming server can't handle two slashes in a path, so remove trailing slashes if url[-1] == '/': url = url[:-1] - - headers = {"Authorization": "Bearer " + self.access_token} - connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) - - if connection.status_code != 200: - raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) - + + # Connect function (called and then potentially passed to async handler) + def connect_func(): + headers = {"Authorization": "Bearer " + self.access_token} + connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) + + if connection.status_code != 200: + raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) + return connection + connection = connect_func() + + # Async stream handler class __stream_handle(): - def __init__(self, connection): + def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec): self.closed = False + self.running = True self.connection = connection - + self.connect_func = connect_func + self.reconnect_async = reconnect_async + self.reconnect_async_wait_sec = reconnect_async_wait_sec + def close(self): self.closed = True self.connection.close() @@ -1716,17 +1725,39 @@ class Mastodon: def _threadproc(self): self._thread = threading.current_thread() - with closing(connection) as r: - try: - listener.handle_stream(r) - except AttributeError as e: - if not self.closed: - raise e + + # Run until closed or until error if not autoreconnecting + while self.running: + with closing(self.connection) as r: + try: + listener.handle_stream(r) + except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e: + if not (self.closed or self.reconnect_async): + raise e + else: + if self.closed: + self.running = False + + # Reconnect loop. Try immediately once, then with delays on error. + if self.reconnect_async and not self.closed: + connect_success = False + while not connect_success: + connect_success = True + try: + self.connection = self.connect_func() + if self.connection.status_code != 200: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + except: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + + else: + self.running = False return 0 - handle = __stream_handle(connection) - if async: + handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t.start() return handle -- cgit v1.2.3 From 6d4490295a7371da1939efd612f919b6ce6af3d3 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:55:31 +0200 Subject: Nicen up reconnect waiting --- mastodon/Mastodon.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'mastodon/Mastodon.py') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 933a6c5..595b79e 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -89,6 +89,7 @@ class Mastodon: """ __DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_TIMEOUT = 300 + __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 __SUPPORTED_MASTODON_VERSION = "2.2.0" ### @@ -1387,7 +1388,7 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. @@ -1395,21 +1396,21 @@ class Mastodon: return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams public events. """ return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams local public events. """ return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. @@ -1419,7 +1420,7 @@ class Mastodon: return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream events for the current user, restricted to accounts on the given list. -- cgit v1.2.3 From eb336a30c2ef04c2b0f66727de1bc585ad96e2da Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:59:45 +0200 Subject: Add is_receiving: --- mastodon/Mastodon.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'mastodon/Mastodon.py') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 595b79e..3706008 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1716,6 +1716,7 @@ class Mastodon: self.connect_func = connect_func self.reconnect_async = reconnect_async self.reconnect_async_wait_sec = reconnect_async_wait_sec + self.reconnecting = False def close(self): self.closed = True @@ -1724,6 +1725,12 @@ class Mastodon: def is_alive(self): return self._thread.is_alive() + def is_receiving(self): + if self.closed or not self.running or self.reconnecting or not self.is_alive(): + return False + else: + return True + def _threadproc(self): self._thread = threading.current_thread() @@ -1741,6 +1748,7 @@ class Mastodon: # Reconnect loop. Try immediately once, then with delays on error. if self.reconnect_async and not self.closed: + self.reconnecting = True connect_success = False while not connect_success: connect_success = True @@ -1752,7 +1760,7 @@ class Mastodon: except: time.sleep(self.reconnect_async_wait_sec) connect_success = False - + self.reconnecting = False else: self.running = False return 0 -- cgit v1.2.3 From 864c83fa2fe7e09125a24f4a12acb4b1e285bc02 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 15:38:21 +0200 Subject: Rename async to run_async, fixes #120 --- mastodon/Mastodon.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'mastodon/Mastodon.py') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 3706008..5f1edb3 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1388,45 +1388,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_user(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_public(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_local(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_hashtag(self, tag, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_list(self, id, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1668,7 +1668,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def __stream(self, endpoint, listener, params={}, run_async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Internal streaming API helper. @@ -1765,7 +1765,7 @@ class Mastodon: self.running = False return 0 - if async: + if run_async: handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t.start() -- cgit v1.2.3