aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenz Diener <[email protected]>2018-04-17 14:35:09 +0200
committerLorenz Diener <[email protected]>2018-04-17 14:35:09 +0200
commitd0ae9dcd055e3bdc96a5ab817d14cda012516297 (patch)
treeceeb6ef7909f3dd4e217124cb0778208aabf5c06
parent86ec5d7eca078e125837f29dcb7a92030e4ae1b4 (diff)
downloadmastodon.py-d0ae9dcd055e3bdc96a5ab817d14cda012516297.tar.gz
Add async autoreconnect
-rw-r--r--mastodon/Mastodon.py87
-rw-r--r--mastodon/streaming.py3
2 files changed, 60 insertions, 30 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index 8b7064f..933a6c5 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -1387,45 +1387,45 @@ class Mastodon:
1387 # Streaming 1387 # Streaming
1388 ### 1388 ###
1389 @api_version("1.1.0", "1.4.2") 1389 @api_version("1.1.0", "1.4.2")
1390 def stream_user(self, listener, async=False): 1390 def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1391 """ 1391 """
1392 Streams events that are relevant to the authorized user, i.e. home 1392 Streams events that are relevant to the authorized user, i.e. home
1393 timeline and notifications. 1393 timeline and notifications.
1394 """ 1394 """
1395 return self.__stream('/api/v1/streaming/user', listener, async=async) 1395 return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1396 1396
1397 @api_version("1.1.0", "1.4.2") 1397 @api_version("1.1.0", "1.4.2")
1398 def stream_public(self, listener, async=False): 1398 def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1399 """ 1399 """
1400 Streams public events. 1400 Streams public events.
1401 """ 1401 """
1402 return self.__stream('/api/v1/streaming/public', listener, async=async) 1402 return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1403 1403
1404 @api_version("1.1.0", "1.4.2") 1404 @api_version("1.1.0", "1.4.2")
1405 def stream_local(self, listener, async=False): 1405 def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1406 """ 1406 """
1407 Streams local public events. 1407 Streams local public events.
1408 """ 1408 """
1409 return self.__stream('/api/v1/streaming/public/local', listener, async=async) 1409 return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1410 1410
1411 @api_version("1.1.0", "1.4.2") 1411 @api_version("1.1.0", "1.4.2")
1412 def stream_hashtag(self, tag, listener, async=False): 1412 def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1413 """ 1413 """
1414 Stream for all public statuses for the hashtag 'tag' seen by the connected 1414 Stream for all public statuses for the hashtag 'tag' seen by the connected
1415 instance. 1415 instance.
1416 """ 1416 """
1417 if tag.startswith("#"): 1417 if tag.startswith("#"):
1418 raise MastodonIllegalArgumentError("Tag parameter should omit leading #") 1418 raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
1419 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) 1419 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1420 1420
1421 @api_version("2.1.0", "2.1.0") 1421 @api_version("2.1.0", "2.1.0")
1422 def stream_list(self, id, listener, async=False): 1422 def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1423 """ 1423 """
1424 Stream events for the current user, restricted to accounts on the given 1424 Stream events for the current user, restricted to accounts on the given
1425 list. 1425 list.
1426 """ 1426 """
1427 id = self.__unpack_id(id) 1427 id = self.__unpack_id(id)
1428 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) 1428 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1429 1429
1430 ### 1430 ###
1431 # Internal helpers, dragons probably 1431 # Internal helpers, dragons probably
@@ -1667,7 +1667,7 @@ class Mastodon:
1667 1667
1668 return response 1668 return response
1669 1669
1670 def __stream(self, endpoint, listener, params={}, async=False): 1670 def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
1671 """ 1671 """
1672 Internal streaming API helper. 1672 Internal streaming API helper.
1673 1673
@@ -1695,18 +1695,27 @@ class Mastodon:
1695 # The streaming server can't handle two slashes in a path, so remove trailing slashes 1695 # The streaming server can't handle two slashes in a path, so remove trailing slashes
1696 if url[-1] == '/': 1696 if url[-1] == '/':
1697 url = url[:-1] 1697 url = url[:-1]
1698 1698
1699 headers = {"Authorization": "Bearer " + self.access_token} 1699 # Connect function (called and then potentially passed to async handler)
1700 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) 1700 def connect_func():
1701 1701 headers = {"Authorization": "Bearer " + self.access_token}
1702 if connection.status_code != 200: 1702 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True)
1703 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) 1703
1704 1704 if connection.status_code != 200:
1705 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
1706 return connection
1707 connection = connect_func()
1708
1709 # Async stream handler
1705 class __stream_handle(): 1710 class __stream_handle():
1706 def __init__(self, connection): 1711 def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
1707 self.closed = False 1712 self.closed = False
1713 self.running = True
1708 self.connection = connection 1714 self.connection = connection
1709 1715 self.connect_func = connect_func
1716 self.reconnect_async = reconnect_async
1717 self.reconnect_async_wait_sec = reconnect_async_wait_sec
1718
1710 def close(self): 1719 def close(self):
1711 self.closed = True 1720 self.closed = True
1712 self.connection.close() 1721 self.connection.close()
@@ -1716,17 +1725,39 @@ class Mastodon:
1716 1725
1717 def _threadproc(self): 1726 def _threadproc(self):
1718 self._thread = threading.current_thread() 1727 self._thread = threading.current_thread()
1719 with closing(connection) as r: 1728
1720 try: 1729 # Run until closed or until error if not autoreconnecting
1721 listener.handle_stream(r) 1730 while self.running:
1722 except AttributeError as e: 1731 with closing(self.connection) as r:
1723 if not self.closed: 1732 try:
1724 raise e 1733 listener.handle_stream(r)
1734 except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
1735 if not (self.closed or self.reconnect_async):
1736 raise e
1737 else:
1738 if self.closed:
1739 self.running = False
1740
1741 # Reconnect loop. Try immediately once, then with delays on error.
1742 if self.reconnect_async and not self.closed:
1743 connect_success = False
1744 while not connect_success:
1745 connect_success = True
1746 try:
1747 self.connection = self.connect_func()
1748 if self.connection.status_code != 200:
1749 time.sleep(self.reconnect_async_wait_sec)
1750 connect_success = False
1751 except:
1752 time.sleep(self.reconnect_async_wait_sec)
1753 connect_success = False
1754
1755 else:
1756 self.running = False
1725 return 0 1757 return 0
1726 1758
1727 handle = __stream_handle(connection)
1728
1729 if async: 1759 if async:
1760 handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec)
1730 t = threading.Thread(args=(), daemon = True, target=handle._threadproc) 1761 t = threading.Thread(args=(), daemon = True, target=handle._threadproc)
1731 t.start() 1762 t.start()
1732 return handle 1763 return handle
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
index f59b431..1c73f48 100644
--- a/mastodon/streaming.py
+++ b/mastodon/streaming.py
@@ -6,7 +6,7 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
6import json 6import json
7import six 7import six
8from mastodon import Mastodon 8from mastodon import Mastodon
9from mastodon.Mastodon import MastodonMalformedEventError 9from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError
10from requests.exceptions import ChunkedEncodingError 10from requests.exceptions import ChunkedEncodingError
11 11
12class StreamListener(object): 12class StreamListener(object):
@@ -109,7 +109,6 @@ class StreamListener(object):
109 err 109 err
110 ) 110 )
111 else: 111 else:
112 # TODO: allow handlers to return/raise to stop streaming cleanly
113 handler(payload) 112 handler(payload)
114 113
115class CallbackStreamListener(StreamListener): 114class CallbackStreamListener(StreamListener):
Powered by cgit v1.2.3 (git 2.41.0)