diff options
author | Lorenz Diener <[email protected]> | 2018-04-17 16:06:00 +0200 |
---|---|---|
committer | GitHub <[email protected]> | 2018-04-17 16:06:00 +0200 |
commit | 2afc50c803177fc282fac1a19fcd40e175d55f2d (patch) | |
tree | 7ce50a932d74a3ad902b6b4b4613c18b33937bb5 /mastodon | |
parent | 06a7a875fe2705044b98f3ef285944b5513be7de (diff) | |
parent | 864c83fa2fe7e09125a24f4a12acb4b1e285bc02 (diff) | |
download | mastodon.py-2afc50c803177fc282fac1a19fcd40e175d55f2d.tar.gz |
Merge branch 'master' into stream-timeout
Diffstat (limited to 'mastodon')
-rw-r--r-- | mastodon/Mastodon.py | 96 | ||||
-rw-r--r-- | mastodon/__init__.py | 4 | ||||
-rw-r--r-- | mastodon/streaming.py | 46 |
3 files changed, 96 insertions, 50 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index ea54649..5461661 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py | |||
@@ -90,6 +90,7 @@ class Mastodon: | |||
90 | __DEFAULT_BASE_URL = 'https://mastodon.social' | 90 | __DEFAULT_BASE_URL = 'https://mastodon.social' |
91 | __DEFAULT_TIMEOUT = 300 | 91 | __DEFAULT_TIMEOUT = 300 |
92 | __DEFAULT_STREAM_TIMEOUT = 300 | 92 | __DEFAULT_STREAM_TIMEOUT = 300 |
93 | __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 | ||
93 | __SUPPORTED_MASTODON_VERSION = "2.2.0" | 94 | __SUPPORTED_MASTODON_VERSION = "2.2.0" |
94 | 95 | ||
95 | ### | 96 | ### |
@@ -1388,45 +1389,45 @@ class Mastodon: | |||
1388 | # Streaming | 1389 | # Streaming |
1389 | ### | 1390 | ### |
1390 | @api_version("1.1.0", "1.4.2") | 1391 | @api_version("1.1.0", "1.4.2") |
1391 | def stream_user(self, listener, async=False): | 1392 | def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1392 | """ | 1393 | """ |
1393 | Streams events that are relevant to the authorized user, i.e. home | 1394 | Streams events that are relevant to the authorized user, i.e. home |
1394 | timeline and notifications. | 1395 | timeline and notifications. |
1395 | """ | 1396 | """ |
1396 | return self.__stream('/api/v1/streaming/user', listener, async=async) | 1397 | return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1397 | 1398 | ||
1398 | @api_version("1.1.0", "1.4.2") | 1399 | @api_version("1.1.0", "1.4.2") |
1399 | def stream_public(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): | 1400 | def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1400 | """ | 1401 | """ |
1401 | Streams public events. | 1402 | Streams public events. |
1402 | """ | 1403 | """ |
1403 | return self.__stream('/api/v1/streaming/public', listener, async=async, timeout=timeout) | 1404 | return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1404 | 1405 | ||
1405 | @api_version("1.1.0", "1.4.2") | 1406 | @api_version("1.1.0", "1.4.2") |
1406 | def stream_local(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): | 1407 | def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1407 | """ | 1408 | """ |
1408 | Streams local public events. | 1409 | Streams local public events. |
1409 | """ | 1410 | """ |
1410 | return self.__stream('/api/v1/streaming/public/local', listener, async=async, timeout=timeout) | 1411 | return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1411 | 1412 | ||
1412 | @api_version("1.1.0", "1.4.2") | 1413 | @api_version("1.1.0", "1.4.2") |
1413 | def stream_hashtag(self, tag, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): | 1414 | def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1414 | """ | 1415 | """ |
1415 | Stream for all public statuses for the hashtag 'tag' seen by the connected | 1416 | Stream for all public statuses for the hashtag 'tag' seen by the connected |
1416 | instance. | 1417 | instance. |
1417 | """ | 1418 | """ |
1418 | if tag.startswith("#"): | 1419 | if tag.startswith("#"): |
1419 | raise MastodonIllegalArgumentError("Tag parameter should omit leading #") | 1420 | raise MastodonIllegalArgumentError("Tag parameter should omit leading #") |
1420 | return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, timeout=timeout) | 1421 | return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1421 | 1422 | ||
1422 | @api_version("2.1.0", "2.1.0") | 1423 | @api_version("2.1.0", "2.1.0") |
1423 | def stream_list(self, id, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): | 1424 | def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1424 | """ | 1425 | """ |
1425 | Stream events for the current user, restricted to accounts on the given | 1426 | Stream events for the current user, restricted to accounts on the given |
1426 | list. | 1427 | list. |
1427 | """ | 1428 | """ |
1428 | id = self.__unpack_id(id) | 1429 | id = self.__unpack_id(id) |
1429 | return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, timeout=timeout) | 1430 | return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) |
1430 | 1431 | ||
1431 | ### | 1432 | ### |
1432 | # Internal helpers, dragons probably | 1433 | # Internal helpers, dragons probably |
@@ -1668,7 +1669,7 @@ class Mastodon: | |||
1668 | 1669 | ||
1669 | return response | 1670 | return response |
1670 | 1671 | ||
1671 | def __stream(self, endpoint, listener, params={}, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): | 1672 | def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): |
1672 | """ | 1673 | """ |
1673 | Internal streaming API helper. | 1674 | Internal streaming API helper. |
1674 | 1675 | ||
@@ -1696,19 +1697,29 @@ class Mastodon: | |||
1696 | # The streaming server can't handle two slashes in a path, so remove trailing slashes | 1697 | # The streaming server can't handle two slashes in a path, so remove trailing slashes |
1697 | if url[-1] == '/': | 1698 | if url[-1] == '/': |
1698 | url = url[:-1] | 1699 | url = url[:-1] |
1699 | 1700 | ||
1700 | headers = {"Authorization": "Bearer " + self.access_token} | 1701 | # Connect function (called and then potentially passed to async handler) |
1701 | connection = requests.get(url + endpoint, headers = headers, data = params, stream = True, | 1702 | def connect_func(): |
1703 | headers = {"Authorization": "Bearer " + self.access_token} | ||
1704 | connection = requests.get(url + endpoint, headers = headers, data = params, stream = True, | ||
1702 | timeout=(self.request_timeout, timeout)) | 1705 | timeout=(self.request_timeout, timeout)) |
1703 | 1706 | ||
1704 | if connection.status_code != 200: | 1707 | if connection.status_code != 200: |
1705 | raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) | 1708 | raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) |
1706 | 1709 | return connection | |
1710 | connection = connect_func() | ||
1711 | |||
1712 | # Async stream handler | ||
1707 | class __stream_handle(): | 1713 | class __stream_handle(): |
1708 | def __init__(self, connection): | 1714 | def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec): |
1709 | self.closed = False | 1715 | self.closed = False |
1716 | self.running = True | ||
1710 | self.connection = connection | 1717 | self.connection = connection |
1711 | 1718 | self.connect_func = connect_func | |
1719 | self.reconnect_async = reconnect_async | ||
1720 | self.reconnect_async_wait_sec = reconnect_async_wait_sec | ||
1721 | self.reconnecting = False | ||
1722 | |||
1712 | def close(self): | 1723 | def close(self): |
1713 | self.closed = True | 1724 | self.closed = True |
1714 | self.connection.close() | 1725 | self.connection.close() |
@@ -1716,19 +1727,48 @@ class Mastodon: | |||
1716 | def is_alive(self): | 1727 | def is_alive(self): |
1717 | return self._thread.is_alive() | 1728 | return self._thread.is_alive() |
1718 | 1729 | ||
1730 | def is_receiving(self): | ||
1731 | if self.closed or not self.running or self.reconnecting or not self.is_alive(): | ||
1732 | return False | ||
1733 | else: | ||
1734 | return True | ||
1735 | |||
1719 | def _threadproc(self): | 1736 | def _threadproc(self): |
1720 | self._thread = threading.current_thread() | 1737 | self._thread = threading.current_thread() |
1721 | with closing(connection) as r: | 1738 | |
1722 | try: | 1739 | # Run until closed or until error if not autoreconnecting |
1723 | listener.handle_stream(r) | 1740 | while self.running: |
1724 | except AttributeError as e: | 1741 | with closing(self.connection) as r: |
1725 | if not self.closed: | 1742 | try: |
1726 | raise e | 1743 | listener.handle_stream(r) |
1744 | except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e: | ||
1745 | if not (self.closed or self.reconnect_async): | ||
1746 | raise e | ||
1747 | else: | ||
1748 | if self.closed: | ||
1749 | self.running = False | ||
1750 | |||
1751 | # Reconnect loop. Try immediately once, then with delays on error. | ||
1752 | if self.reconnect_async and not self.closed: | ||
1753 | self.reconnecting = True | ||
1754 | connect_success = False | ||
1755 | while not connect_success: | ||
1756 | connect_success = True | ||
1757 | try: | ||
1758 | self.connection = self.connect_func() | ||
1759 | if self.connection.status_code != 200: | ||
1760 | time.sleep(self.reconnect_async_wait_sec) | ||
1761 | connect_success = False | ||
1762 | except: | ||
1763 | time.sleep(self.reconnect_async_wait_sec) | ||
1764 | connect_success = False | ||
1765 | self.reconnecting = False | ||
1766 | else: | ||
1767 | self.running = False | ||
1727 | return 0 | 1768 | return 0 |
1728 | 1769 | ||
1729 | handle = __stream_handle(connection) | 1770 | if run_async: |
1730 | 1771 | handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec) | |
1731 | if async: | ||
1732 | t = threading.Thread(args=(), daemon = True, target=handle._threadproc) | 1772 | t = threading.Thread(args=(), daemon = True, target=handle._threadproc) |
1733 | t.start() | 1773 | t.start() |
1734 | return handle | 1774 | return handle |
diff --git a/mastodon/__init__.py b/mastodon/__init__.py index fdf776d..787d4e8 100644 --- a/mastodon/__init__.py +++ b/mastodon/__init__.py | |||
@@ -1,4 +1,4 @@ | |||
1 | from mastodon.Mastodon import Mastodon | 1 | from mastodon.Mastodon import Mastodon, MastodonError, MastodonVersionError, MastodonIllegalArgumentError, MastodonIOError, MastodonFileNotFoundError, MastodonNetworkError, MastodonAPIError, MastodonNotFoundError, MastodonUnauthorizedError, MastodonRatelimitError, MastodonMalformedEventError |
2 | from mastodon.streaming import StreamListener, CallbackStreamListener | 2 | from mastodon.streaming import StreamListener, CallbackStreamListener |
3 | 3 | ||
4 | __all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener'] | 4 | __all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener', 'MastodonError', 'MastodonVersionError', 'MastodonIllegalArgumentError', 'MastodonIOError', 'MastodonFileNotFoundError', 'MastodonNetworkError', 'MastodonAPIError', 'MastodonNotFoundError', 'MastodonUnauthorizedError', 'MastodonRatelimitError', 'MastodonMalformedEventError'] |
diff --git a/mastodon/streaming.py b/mastodon/streaming.py index d55ad54..1c73f48 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py | |||
@@ -6,7 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A | |||
6 | import json | 6 | import json |
7 | import six | 7 | import six |
8 | from mastodon import Mastodon | 8 | from mastodon import Mastodon |
9 | from mastodon.Mastodon import MastodonMalformedEventError | 9 | from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError |
10 | from requests.exceptions import ChunkedEncodingError | ||
10 | 11 | ||
11 | class StreamListener(object): | 12 | class StreamListener(object): |
12 | """Callbacks for the streaming API. Create a subclass, override the on_xxx | 13 | """Callbacks for the streaming API. Create a subclass, override the on_xxx |
@@ -43,25 +44,31 @@ class StreamListener(object): | |||
43 | """ | 44 | """ |
44 | event = {} | 45 | event = {} |
45 | line_buffer = bytearray() | 46 | line_buffer = bytearray() |
46 | for chunk in response.iter_content(chunk_size = 1): | 47 | try: |
47 | if chunk: | 48 | for chunk in response.iter_content(chunk_size = 1): |
48 | if chunk == b'\n': | 49 | if chunk: |
49 | try: | 50 | if chunk == b'\n': |
50 | line = line_buffer.decode('utf-8') | 51 | try: |
51 | except UnicodeDecodeError as err: | 52 | line = line_buffer.decode('utf-8') |
52 | six.raise_from( | 53 | except UnicodeDecodeError as err: |
53 | MastodonMalformedEventError("Malformed UTF-8"), | 54 | six.raise_from( |
54 | err | 55 | MastodonMalformedEventError("Malformed UTF-8"), |
55 | ) | 56 | err |
56 | if line == '': | 57 | ) |
57 | self._dispatch(event) | 58 | if line == '': |
58 | event = {} | 59 | self._dispatch(event) |
60 | event = {} | ||
61 | else: | ||
62 | event = self._parse_line(line, event) | ||
63 | line_buffer = bytearray() | ||
59 | else: | 64 | else: |
60 | event = self._parse_line(line, event) | 65 | line_buffer.extend(chunk) |
61 | line_buffer = bytearray() | 66 | except ChunkedEncodingError as err: |
62 | else: | 67 | six.raise_from( |
63 | line_buffer.extend(chunk) | 68 | MastodonNetworkError("Server ceased communication."), |
64 | 69 | err | |
70 | ) | ||
71 | |||
65 | def _parse_line(self, line, event): | 72 | def _parse_line(self, line, event): |
66 | if line.startswith(':'): | 73 | if line.startswith(':'): |
67 | self.handle_heartbeat() | 74 | self.handle_heartbeat() |
@@ -102,7 +109,6 @@ class StreamListener(object): | |||
102 | err | 109 | err |
103 | ) | 110 | ) |
104 | else: | 111 | else: |
105 | # TODO: allow handlers to return/raise to stop streaming cleanly | ||
106 | handler(payload) | 112 | handler(payload) |
107 | 113 | ||
108 | class CallbackStreamListener(StreamListener): | 114 | class CallbackStreamListener(StreamListener): |