From d0ae9dcd055e3bdc96a5ab817d14cda012516297 Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 17 Apr 2018 14:35:09 +0200 Subject: Add async autoreconnect --- mastodon/Mastodon.py | 87 ++++++++++++++++++++++++++++++++++----------------- mastodon/streaming.py | 3 +- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8b7064f..933a6c5 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1387,45 +1387,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, async=False): + def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, async=async) + return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, async=False): + def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, async=async) + return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, async=False): + def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, async=async) + return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, async=False): + def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, async=False): + def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1667,7 +1667,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, async=False): + def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): """ Internal streaming API helper. @@ -1695,18 +1695,27 @@ class Mastodon: # The streaming server can't handle two slashes in a path, so remove trailing slashes if url[-1] == '/': url = url[:-1] - - headers = {"Authorization": "Bearer " + self.access_token} - connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) - - if connection.status_code != 200: - raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) - + + # Connect function (called and then potentially passed to async handler) + def connect_func(): + headers = {"Authorization": "Bearer " + self.access_token} + connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) + + if connection.status_code != 200: + raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) + return connection + connection = connect_func() + + # Async stream handler class __stream_handle(): - def __init__(self, connection): + def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec): self.closed = False + self.running = True self.connection = connection - + self.connect_func = connect_func + self.reconnect_async = reconnect_async + self.reconnect_async_wait_sec = reconnect_async_wait_sec + def close(self): self.closed = True self.connection.close() @@ -1716,17 +1725,39 @@ class Mastodon: def _threadproc(self): self._thread = threading.current_thread() - with closing(connection) as r: - try: - listener.handle_stream(r) - except AttributeError as e: - if not self.closed: - raise e + + # Run until closed or until error if not autoreconnecting + while self.running: + with closing(self.connection) as r: + try: + listener.handle_stream(r) + except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e: + if not (self.closed or self.reconnect_async): + raise e + else: + if self.closed: + self.running = False + + # Reconnect loop. Try immediately once, then with delays on error. + if self.reconnect_async and not self.closed: + connect_success = False + while not connect_success: + connect_success = True + try: + self.connection = self.connect_func() + if self.connection.status_code != 200: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + except: + time.sleep(self.reconnect_async_wait_sec) + connect_success = False + + else: + self.running = False return 0 - handle = __stream_handle(connection) - if async: + handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t.start() return handle diff --git a/mastodon/streaming.py b/mastodon/streaming.py index f59b431..1c73f48 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -6,7 +6,7 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A import json import six from mastodon import Mastodon -from mastodon.Mastodon import MastodonMalformedEventError +from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError from requests.exceptions import ChunkedEncodingError class StreamListener(object): @@ -109,7 +109,6 @@ class StreamListener(object): err ) else: - # TODO: allow handlers to return/raise to stop streaming cleanly handler(payload) class CallbackStreamListener(StreamListener): -- cgit v1.2.3