From 86ec5d7eca078e125837f29dcb7a92030e4ae1b4 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 20 Feb 2018 14:04:17 +0100 Subject: Re-raise Chunked Encoding Errors as Network Errors --- mastodon/streaming.py | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/mastodon/streaming.py b/mastodon/streaming.py index d55ad54..f59b431 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -7,6 +7,7 @@ import json import six from mastodon import Mastodon from mastodon.Mastodon import MastodonMalformedEventError +from requests.exceptions import ChunkedEncodingError class StreamListener(object): """Callbacks for the streaming API. Create a subclass, override the on_xxx @@ -43,25 +44,31 @@ class StreamListener(object): """ event = {} line_buffer = bytearray() - for chunk in response.iter_content(chunk_size = 1): - if chunk: - if chunk == b'\n': - try: - line = line_buffer.decode('utf-8') - except UnicodeDecodeError as err: - six.raise_from( - MastodonMalformedEventError("Malformed UTF-8"), - err - ) - if line == '': - self._dispatch(event) - event = {} + try: + for chunk in response.iter_content(chunk_size = 1): + if chunk: + if chunk == b'\n': + try: + line = line_buffer.decode('utf-8') + except UnicodeDecodeError as err: + six.raise_from( + MastodonMalformedEventError("Malformed UTF-8"), + err + ) + if line == '': + self._dispatch(event) + event = {} + else: + event = self._parse_line(line, event) + line_buffer = bytearray() else: - event = self._parse_line(line, event) - line_buffer = bytearray() - else: - line_buffer.extend(chunk) - + line_buffer.extend(chunk) + except ChunkedEncodingError as err: + six.raise_from( + MastodonNetworkError("Server ceased communication."), + err + ) + def _parse_line(self, line, event): if line.startswith(':'): self.handle_heartbeat() -- cgit v1.2.3 From d0ae9dcd055e3bdc96a5ab817d14cda012516297 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:35:09 +0200 Subject: Add async autoreconnect --- mastodon/Mastodon.py | 87 ++++++++++++++++++++++++++++++++++----------------- mastodon/streaming.py | 3 +- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8b7064f..933a6c5 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1387,45 +1387,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False): + def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, async=async) + return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False): + def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, async=async) + return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False): + def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, async=async) + return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False): + def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False): + def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1667,7 +1667,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, async=False): + def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Internal streaming API helper. @@ -1695,18 +1695,27 @@ class Mastodon: # The streaming server can't handle two slashes in a path, so remove trailing slashes if url[-1] == '/': url = url[:-1] - - headers = {"Authorization": "Bearer " + self.access_token} - connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) - - if connection.status_code != 200: - raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) - + + # Connect function (called and then potentially passed to async handler) + def connect_func(): + headers = {"Authorization": "Bearer " + self.access_token} + connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) + + if connection.status_code != 200: + raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) + return connection + connection = connect_func() + + # Async stream handler class __stream_handle(): - def __init__(self, connection): + def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec): self.closed = False + self.running = True self.connection = connection - + self.connect_func = connect_func + self.reconnect_async = reconnect_async + self.reconnect_async_wait_sec = reconnect_async_wait_sec + def close(self): self.closed = True self.connection.close() @@ -1716,17 +1725,39 @@ class Mastodon: def _threadproc(self): self._thread = threading.current_thread() - with closing(connection) as r: - try: - listener.handle_stream(r) - except AttributeError as e: - if not self.closed: - raise e + + # Run until closed or until error if not autoreconnecting + while self.running: + with closing(self.connection) as r: + try: + listener.handle_stream(r) + except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e: + if not (self.closed or self.reconnect_async): + raise e + else: + if self.closed: + self.running = False + + # Reconnect loop. Try immediately once, then with delays on error. + if self.reconnect_async and not self.closed: + connect_success = False + while not connect_success: + connect_success = True + try: + self.connection = self.connect_func() + if self.connection.status_code != 200: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + except: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + + else: + self.running = False return 0 - handle = __stream_handle(connection) - if async: + handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t.start() return handle diff --git a/mastodon/streaming.py b/mastodon/streaming.py index f59b431..1c73f48 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -6,7 +6,7 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A import json import six from mastodon import Mastodon -from mastodon.Mastodon import MastodonMalformedEventError +from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError from requests.exceptions import ChunkedEncodingError class StreamListener(object): @@ -109,7 +109,6 @@ class StreamListener(object): err ) else: - # TODO: allow handlers to return/raise to stop streaming cleanly handler(payload) class CallbackStreamListener(StreamListener): -- cgit v1.2.3 From 6d4490295a7371da1939efd612f919b6ce6af3d3 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:55:31 +0200 Subject: Nicen up reconnect waiting --- mastodon/Mastodon.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 933a6c5..595b79e 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -89,6 +89,7 @@ class Mastodon: """ __DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_TIMEOUT = 300 + __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 __SUPPORTED_MASTODON_VERSION = "2.2.0" ### @@ -1387,7 +1388,7 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. @@ -1395,21 +1396,21 @@ class Mastodon: return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams public events. """ return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams local public events. """ return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. @@ -1419,7 +1420,7 @@ class Mastodon: return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream events for the current user, restricted to accounts on the given list. -- cgit v1.2.3 From eb336a30c2ef04c2b0f66727de1bc585ad96e2da Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:59:45 +0200 Subject: Add is_receiving: --- mastodon/Mastodon.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 595b79e..3706008 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1716,6 +1716,7 @@ class Mastodon: self.connect_func = connect_func self.reconnect_async = reconnect_async self.reconnect_async_wait_sec = reconnect_async_wait_sec + self.reconnecting = False def close(self): self.closed = True @@ -1724,6 +1725,12 @@ class Mastodon: def is_alive(self): return self._thread.is_alive() + def is_receiving(self): + if self.closed or not self.running or self.reconnecting or not self.is_alive(): + return False + else: + return True + def _threadproc(self): self._thread = threading.current_thread() @@ -1741,6 +1748,7 @@ class Mastodon: # Reconnect loop. Try immediately once, then with delays on error. if self.reconnect_async and not self.closed: + self.reconnecting = True connect_success = False while not connect_success: connect_success = True @@ -1752,7 +1760,7 @@ class Mastodon: except: time.sleep(self.reconnect_async_wait_sec) connect_success = False - + self.reconnecting = False else: self.running = False return 0 -- cgit v1.2.3 From 400faadc0deeb352d78291857cd6a2c8b8c528d3 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 15:10:40 +0200 Subject: Document new async behaviour a bit --- docs/index.rst | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 5a1254d..f463156 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -725,13 +725,17 @@ Streaming --------- These functions allow access to the streaming API. -If async is False, these methods block forever (or until an -exception is raised). +If `async` is False, these methods block forever (or until an exception is raised). -If async is True, the listener will listen on another thread and these methods -will return a handle corresponding to the open connection. The -connection may be closed at any time by calling the handles close() method, and the -status of the connection can be verified calling is_alive() on the handle. +If `async` is True, the listener will listen on another thread and these methods +will return a handle corresponding to the open connection. If, in addition, `async_reconnect` is True, +the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting +`async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made +to "catch up" - toots made while the connection is broken will not be received. + +The connection may be closed at any time by calling the handles close() method. The +current status of the handler thread can be checked with the handles is_alive() function, +and the streaming status can be checked by calling is_receiving(). The streaming functions take instances of `StreamListener` as the `listener` parameter. A `CallbackStreamListener` class that allows you to specify function callbacks -- cgit v1.2.3 From dd5f4ae08af21e5965b3670231ac92c5503f58cc Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 15:22:07 +0200 Subject: Document streaming error handling --- docs/index.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index f463156..4de0f6e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -725,7 +725,7 @@ Streaming --------- These functions allow access to the streaming API. -If `async` is False, these methods block forever (or until an exception is raised). +If `async` is False, these methods block forever (or until an error is encountered). If `async` is True, the listener will listen on another thread and these methods will return a handle corresponding to the open connection. If, in addition, `async_reconnect` is True, @@ -741,6 +741,10 @@ The streaming functions take instances of `StreamListener` as the `listener` par A `CallbackStreamListener` class that allows you to specify function callbacks directly is included for convenience. +When in not-async mode or async mode without async_reconnect, the stream functions may raise +various exceptions: `MastodonMalformedEventError` if a received event cannot be parsed and +`MastodonNetworkError` if any connection problems occur. + .. automethod:: Mastodon.stream_user .. automethod:: Mastodon.stream_public .. automethod:: Mastodon.stream_local -- cgit v1.2.3 From d5efea72387b9374fa03cb4c282ed76e74a68447 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 15:27:18 +0200 Subject: Make imports better (fixes #129) --- mastodon/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mastodon/__init__.py b/mastodon/__init__.py index fdf776d..787d4e8 100644 --- a/mastodon/__init__.py +++ b/mastodon/__init__.py @@ -1,4 +1,4 @@ -from mastodon.Mastodon import Mastodon +from mastodon.Mastodon import Mastodon, MastodonError, MastodonVersionError, MastodonIllegalArgumentError, MastodonIOError, MastodonFileNotFoundError, MastodonNetworkError, MastodonAPIError, MastodonNotFoundError, MastodonUnauthorizedError, MastodonRatelimitError, MastodonMalformedEventError from mastodon.streaming import StreamListener, CallbackStreamListener -__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener'] +__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener', 'MastodonError', 'MastodonVersionError', 'MastodonIllegalArgumentError', 'MastodonIOError', 'MastodonFileNotFoundError', 'MastodonNetworkError', 'MastodonAPIError', 'MastodonNotFoundError', 'MastodonUnauthorizedError', 'MastodonRatelimitError', 'MastodonMalformedEventError'] -- cgit v1.2.3 From 864c83fa2fe7e09125a24f4a12acb4b1e285bc02 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 15:38:21 +0200 Subject: Rename async to run_async, fixes #120 --- mastodon/Mastodon.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 3706008..5f1edb3 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1388,45 +1388,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_user(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_public(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_local(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_hashtag(self, tag, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_list(self, id, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1668,7 +1668,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def __stream(self, endpoint, listener, params={}, run_async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Internal streaming API helper. @@ -1765,7 +1765,7 @@ class Mastodon: self.running = False return 0 - if async: + if run_async: handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t.start() -- cgit v1.2.3