diff options
Diffstat (limited to 'mastodon')
-rw-r--r-- | mastodon/Mastodon.py | 87 | ||||
-rw-r--r-- | mastodon/streaming.py | 3 |
2 files changed, 60 insertions, 30 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8b7064f..933a6c5 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py | |||
@@ -1387,45 +1387,45 @@ class Mastodon: | |||
1387 | # Streaming | 1387 | # Streaming |
1388 | ### | 1388 | ### |
1389 | @api_version("1.1.0", "1.4.2") | 1389 | @api_version("1.1.0", "1.4.2") |
1390 | def stream_user(self, listener, async=False): | 1390 | def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1391 | """ | 1391 | """ |
1392 | Streams events that are relevant to the authorized user, i.e. home | 1392 | Streams events that are relevant to the authorized user, i.e. home |
1393 | timeline and notifications. | 1393 | timeline and notifications. |
1394 | """ | 1394 | """ |
1395 | return self.__stream('/api/v1/streaming/user', listener, async=async) | 1395 | return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1396 | 1396 | ||
1397 | @api_version("1.1.0", "1.4.2") | 1397 | @api_version("1.1.0", "1.4.2") |
1398 | def stream_public(self, listener, async=False): | 1398 | def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1399 | """ | 1399 | """ |
1400 | Streams public events. | 1400 | Streams public events. |
1401 | """ | 1401 | """ |
1402 | return self.__stream('/api/v1/streaming/public', listener, async=async) | 1402 | return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1403 | 1403 | ||
1404 | @api_version("1.1.0", "1.4.2") | 1404 | @api_version("1.1.0", "1.4.2") |
1405 | def stream_local(self, listener, async=False): | 1405 | def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1406 | """ | 1406 | """ |
1407 | Streams local public events. | 1407 | Streams local public events. |
1408 | """ | 1408 | """ |
1409 | return self.__stream('/api/v1/streaming/public/local', listener, async=async) | 1409 | return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1410 | 1410 | ||
1411 | @api_version("1.1.0", "1.4.2") | 1411 | @api_version("1.1.0", "1.4.2") |
1412 | def stream_hashtag(self, tag, listener, async=False): | 1412 | def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1413 | """ | 1413 | """ |
1414 | Stream for all public statuses for the hashtag 'tag' seen by the connected | 1414 | Stream for all public statuses for the hashtag 'tag' seen by the connected |
1415 | instance. | 1415 | instance. |
1416 | """ | 1416 | """ |
1417 | if tag.startswith("#"): | 1417 | if tag.startswith("#"): |
1418 | raise MastodonIllegalArgumentError("Tag parameter should omit leading #") | 1418 | raise MastodonIllegalArgumentError("Tag parameter should omit leading #") |
1419 | return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) | 1419 | return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1420 | 1420 | ||
1421 | @api_version("2.1.0", "2.1.0") | 1421 | @api_version("2.1.0", "2.1.0") |
1422 | def stream_list(self, id, listener, async=False): | 1422 | def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1423 | """ | 1423 | """ |
1424 | Stream events for the current user, restricted to accounts on the given | 1424 | Stream events for the current user, restricted to accounts on the given |
1425 | list. | 1425 | list. |
1426 | """ | 1426 | """ |
1427 | id = self.__unpack_id(id) | 1427 | id = self.__unpack_id(id) |
1428 | return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) | 1428 | return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1429 | 1429 | ||
1430 | ### | 1430 | ### |
1431 | # Internal helpers, dragons probably | 1431 | # Internal helpers, dragons probably |
@@ -1667,7 +1667,7 @@ class Mastodon: | |||
1667 | 1667 | ||
1668 | return response | 1668 | return response |
1669 | 1669 | ||
1670 | def __stream(self, endpoint, listener, params={}, async=False): | 1670 | def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5): |
1671 | """ | 1671 | """ |
1672 | Internal streaming API helper. | 1672 | Internal streaming API helper. |
1673 | 1673 | ||
@@ -1695,18 +1695,27 @@ class Mastodon: | |||
1695 | # The streaming server can't handle two slashes in a path, so remove trailing slashes | 1695 | # The streaming server can't handle two slashes in a path, so remove trailing slashes |
1696 | if url[-1] == '/': | 1696 | if url[-1] == '/': |
1697 | url = url[:-1] | 1697 | url = url[:-1] |
1698 | 1698 | ||
1699 | headers = {"Authorization": "Bearer " + self.access_token} | 1699 | # Connect function (called and then potentially passed to async handler) |
1700 | connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) | 1700 | def connect_func(): |
1701 | 1701 | headers = {"Authorization": "Bearer " + self.access_token} | |
1702 | if connection.status_code != 200: | 1702 | connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) |
1703 | raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) | 1703 | |
1704 | 1704 | if connection.status_code != 200: | |
1705 | raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) | ||
1706 | return connection | ||
1707 | connection = connect_func() | ||
1708 | |||
1709 | # Async stream handler | ||
1705 | class __stream_handle(): | 1710 | class __stream_handle(): |
1706 | def __init__(self, connection): | 1711 | def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec): |
1707 | self.closed = False | 1712 | self.closed = False |
1713 | self.running = True | ||
1708 | self.connection = connection | 1714 | self.connection = connection |
1709 | 1715 | self.connect_func = connect_func | |
1716 | self.reconnect_async = reconnect_async | ||
1717 | self.reconnect_async_wait_sec = reconnect_async_wait_sec | ||
1718 | |||
1710 | def close(self): | 1719 | def close(self): |
1711 | self.closed = True | 1720 | self.closed = True |
1712 | self.connection.close() | 1721 | self.connection.close() |
@@ -1716,17 +1725,39 @@ class Mastodon: | |||
1716 | 1725 | ||
1717 | def _threadproc(self): | 1726 | def _threadproc(self): |
1718 | self._thread = threading.current_thread() | 1727 | self._thread = threading.current_thread() |
1719 | with closing(connection) as r: | 1728 | |
1720 | try: | 1729 | # Run until closed or until error if not autoreconnecting |
1721 | listener.handle_stream(r) | 1730 | while self.running: |
1722 | except AttributeError as e: | 1731 | with closing(self.connection) as r: |
1723 | if not self.closed: | 1732 | try: |
1724 | raise e | 1733 | listener.handle_stream(r) |
1734 | except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e: | ||
1735 | if not (self.closed or self.reconnect_async): | ||
1736 | raise e | ||
1737 | else: | ||
1738 | if self.closed: | ||
1739 | self.running = False | ||
1740 | |||
1741 | # Reconnect loop. Try immediately once, then with delays on error. | ||
1742 | if self.reconnect_async and not self.closed: | ||
1743 | connect_success = False | ||
1744 | while not connect_success: | ||
1745 | connect_success = True | ||
1746 | try: | ||
1747 | self.connection = self.connect_func() | ||
1748 | if self.connection.status_code != 200: | ||
1749 | time.sleep(self.reconnect_async_wait_sec) | ||
1750 | connect_success = False | ||
1751 | except: | ||
1752 | time.sleep(self.reconnect_async_wait_sec) | ||
1753 | connect_success = False | ||
1754 | |||
1755 | else: | ||
1756 | self.running = False | ||
1725 | return 0 | 1757 | return 0 |
1726 | 1758 | ||
1727 | handle = __stream_handle(connection) | ||
1728 | |||
1729 | if async: | 1759 | if async: |
1760 | handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) | ||
1730 | t = threading.Thread(args=(), daemon = True, target=handle._threadproc) | 1761 | t = threading.Thread(args=(), daemon = True, target=handle._threadproc) |
1731 | t.start() | 1762 | t.start() |
1732 | return handle | 1763 | return handle |
diff --git a/mastodon/streaming.py b/mastodon/streaming.py index f59b431..1c73f48 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py | |||
@@ -6,7 +6,7 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A | |||
6 | import json | 6 | import json |
7 | import six | 7 | import six |
8 | from mastodon import Mastodon | 8 | from mastodon import Mastodon |
9 | from mastodon.Mastodon import MastodonMalformedEventError | 9 | from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError |
10 | from requests.exceptions import ChunkedEncodingError | 10 | from requests.exceptions import ChunkedEncodingError |
11 | 11 | ||
12 | class StreamListener(object): | 12 | class StreamListener(object): |
@@ -109,7 +109,6 @@ class StreamListener(object): | |||
109 | err | 109 | err |
110 | ) | 110 | ) |
111 | else: | 111 | else: |
112 | # TODO: allow handlers to return/raise to stop streaming cleanly | ||
113 | handler(payload) | 112 | handler(payload) |
114 | 113 | ||
115 | class CallbackStreamListener(StreamListener): | 114 | class CallbackStreamListener(StreamListener): |