From 57520beea7ef83311be10be8acab4b34d2e10bf5 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 5 Jun 2018 17:37:11 +0200 Subject: Improve on_abort handler --- docs/index.rst | 3 ++- mastodon/streaming.py | 60 +++++++++++++++++++++++++++++++++++---------------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 56e0756..fc8c003 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -815,7 +815,8 @@ will return a handle corresponding to the open connection. If, in addition, `asy the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting `async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made to "catch up" - events created while the connection is broken will not be received. If you need to make -sure to get absolutely all notifications / deletes / toots, you will have to do that manually. +sure to get absolutely all notifications / deletes / toots, you will have to do that manually, e.g. +using the `on_abort` handler to fill in events since the last received one and then reconnecting. The connection may be closed at any time by calling the handles close() method. The current status of the handler thread can be checked with the handles is_alive() function, diff --git a/mastodon/streaming.py b/mastodon/streaming.py index 8c3ec19..65ec30a 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -25,8 +25,15 @@ class StreamListener(object): describing the notification.""" pass - def on_abort(self): - """There was a connection error or read timeout.""" + def on_abort(self, err): + """There was a connection error, read timeout or other error fatal to + the streaming connection. The exception object about to be raised + is passed to this function for reference. + + Note that the exception will be raised properly once you return from this + function, so if you are using this handler to reconnect, either never + return or start a thread and then catch and ignore the exception. + """ pass def on_delete(self, status_id): @@ -55,8 +62,10 @@ class StreamListener(object): try: line = line_buffer.decode('utf-8') except UnicodeDecodeError as err: + exception = MastodonMalformedEventError("Malformed UTF-8") + self.on_abort(exception) six.raise_from( - MastodonMalformedEventError("Malformed UTF-8"), + exception, err ) if line == '': @@ -68,15 +77,17 @@ class StreamListener(object): else: line_buffer.extend(chunk) except ChunkedEncodingError as err: - self.on_abort() + exception = MastodonNetworkError("Server ceased communication.") + self.on_abort(exception) six.raise_from( - MastodonNetworkError("Server ceased communication."), + exception, err ) except MastodonReadTimeout as err: - self.on_abort() + exception = MastodonReadTimeout("Timed out while reading from server."), + self.on_abort(exception) six.raise_from( - MastodonReadTimeout("Timed out while reading from server."), + exception, err ) @@ -84,7 +95,12 @@ class StreamListener(object): if line.startswith(':'): self.handle_heartbeat() else: - key, value = line.split(': ', 1) + try: + key, value = line.split(': ', 1) + except: + exception = MastodonMalformedEventError("Malformed event.") + self.on_abort(exception) + raise exception # According to the MDN spec, repeating the 'data' key # represents a newline(!) if key in event: @@ -99,24 +115,30 @@ class StreamListener(object): data = event['data'] payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks) except KeyError as err: - six.raise_from( - MastodonMalformedEventError('Missing field', err.args[0], event), - err - ) + exception = MastodonMalformedEventError('Missing field', err.args[0], event) + self.on_abort(exception) + six.raise_from( + exception, + err + ) except ValueError as err: - # py2: plain ValueError - # py3: json.JSONDecodeError, a subclass of ValueError - six.raise_from( - MastodonMalformedEventError('Bad JSON', data), - err - ) + # py2: plain ValueError + # py3: json.JSONDecodeError, a subclass of ValueError + exception = MastodonMalformedEventError('Bad JSON', data) + self.on_abort(exception) + six.raise_from( + exception, + err + ) handler_name = 'on_' + name try: handler = getattr(self, handler_name) except AttributeError as err: + exception = MastodonMalformedEventError('Bad event type', name) + self.on_abort(exception) six.raise_from( - MastodonMalformedEventError('Bad event type', name), + exception, err ) else: -- cgit v1.2.3