aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenz Diener <[email protected]>2018-04-17 16:06:00 +0200
committerGitHub <[email protected]>2018-04-17 16:06:00 +0200
commit2afc50c803177fc282fac1a19fcd40e175d55f2d (patch)
tree7ce50a932d74a3ad902b6b4b4613c18b33937bb5
parent06a7a875fe2705044b98f3ef285944b5513be7de (diff)
parent864c83fa2fe7e09125a24f4a12acb4b1e285bc02 (diff)
downloadmastodon.py-2afc50c803177fc282fac1a19fcd40e175d55f2d.tar.gz
Merge branch 'master' into stream-timeout
-rw-r--r--docs/index.rst20
-rw-r--r--mastodon/Mastodon.py96
-rw-r--r--mastodon/__init__.py4
-rw-r--r--mastodon/streaming.py46
4 files changed, 110 insertions, 56 deletions
diff --git a/docs/index.rst b/docs/index.rst
index 5a1254d..4de0f6e 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -725,18 +725,26 @@ Streaming
725--------- 725---------
726These functions allow access to the streaming API. 726These functions allow access to the streaming API.
727 727
728If async is False, these methods block forever (or until an 728If `async` is False, these methods block forever (or until an error is encountered).
729exception is raised).
730 729
731If async is True, the listener will listen on another thread and these methods 730If `async` is True, the listener will listen on another thread and these methods
732will return a handle corresponding to the open connection. The 731will return a handle corresponding to the open connection. If, in addition, `async_reconnect` is True,
733connection may be closed at any time by calling the handles close() method, and the 732the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting
734status of the connection can be verified calling is_alive() on the handle. 733`async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made
734to "catch up" - toots made while the connection is broken will not be received.
735
736The connection may be closed at any time by calling the handles close() method. The
737current status of the handler thread can be checked with the handles is_alive() function,
738and the streaming status can be checked by calling is_receiving().
735 739
736The streaming functions take instances of `StreamListener` as the `listener` parameter. 740The streaming functions take instances of `StreamListener` as the `listener` parameter.
737A `CallbackStreamListener` class that allows you to specify function callbacks 741A `CallbackStreamListener` class that allows you to specify function callbacks
738directly is included for convenience. 742directly is included for convenience.
739 743
744When in not-async mode or async mode without async_reconnect, the stream functions may raise
745various exceptions: `MastodonMalformedEventError` if a received event cannot be parsed and
746`MastodonNetworkError` if any connection problems occur.
747
740.. automethod:: Mastodon.stream_user 748.. automethod:: Mastodon.stream_user
741.. automethod:: Mastodon.stream_public 749.. automethod:: Mastodon.stream_public
742.. automethod:: Mastodon.stream_local 750.. automethod:: Mastodon.stream_local
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index ea54649..5461661 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -90,6 +90,7 @@ class Mastodon:
90 __DEFAULT_BASE_URL = 'https://mastodon.social' 90 __DEFAULT_BASE_URL = 'https://mastodon.social'
91 __DEFAULT_TIMEOUT = 300 91 __DEFAULT_TIMEOUT = 300
92 __DEFAULT_STREAM_TIMEOUT = 300 92 __DEFAULT_STREAM_TIMEOUT = 300
93 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
93 __SUPPORTED_MASTODON_VERSION = "2.2.0" 94 __SUPPORTED_MASTODON_VERSION = "2.2.0"
94 95
95 ### 96 ###
@@ -1388,45 +1389,45 @@ class Mastodon:
1388 # Streaming 1389 # Streaming
1389 ### 1390 ###
1390 @api_version("1.1.0", "1.4.2") 1391 @api_version("1.1.0", "1.4.2")
1391 def stream_user(self, listener, async=False): 1392 def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1392 """ 1393 """
1393 Streams events that are relevant to the authorized user, i.e. home 1394 Streams events that are relevant to the authorized user, i.e. home
1394 timeline and notifications. 1395 timeline and notifications.
1395 """ 1396 """
1396 return self.__stream('/api/v1/streaming/user', listener, async=async) 1397 return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1397 1398
1398 @api_version("1.1.0", "1.4.2") 1399 @api_version("1.1.0", "1.4.2")
1399 def stream_public(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1400 def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1400 """ 1401 """
1401 Streams public events. 1402 Streams public events.
1402 """ 1403 """
1403 return self.__stream('/api/v1/streaming/public', listener, async=async, timeout=timeout) 1404 return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1404 1405
1405 @api_version("1.1.0", "1.4.2") 1406 @api_version("1.1.0", "1.4.2")
1406 def stream_local(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1407 def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1407 """ 1408 """
1408 Streams local public events. 1409 Streams local public events.
1409 """ 1410 """
1410 return self.__stream('/api/v1/streaming/public/local', listener, async=async, timeout=timeout) 1411 return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1411 1412
1412 @api_version("1.1.0", "1.4.2") 1413 @api_version("1.1.0", "1.4.2")
1413 def stream_hashtag(self, tag, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1414 def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1414 """ 1415 """
1415 Stream for all public statuses for the hashtag 'tag' seen by the connected 1416 Stream for all public statuses for the hashtag 'tag' seen by the connected
1416 instance. 1417 instance.
1417 """ 1418 """
1418 if tag.startswith("#"): 1419 if tag.startswith("#"):
1419 raise MastodonIllegalArgumentError("Tag parameter should omit leading #") 1420 raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
1420 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, timeout=timeout) 1421 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1421 1422
1422 @api_version("2.1.0", "2.1.0") 1423 @api_version("2.1.0", "2.1.0")
1423 def stream_list(self, id, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1424 def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1424 """ 1425 """
1425 Stream events for the current user, restricted to accounts on the given 1426 Stream events for the current user, restricted to accounts on the given
1426 list. 1427 list.
1427 """ 1428 """
1428 id = self.__unpack_id(id) 1429 id = self.__unpack_id(id)
1429 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, timeout=timeout) 1430 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1430 1431
1431 ### 1432 ###
1432 # Internal helpers, dragons probably 1433 # Internal helpers, dragons probably
@@ -1668,7 +1669,7 @@ class Mastodon:
1668 1669
1669 return response 1670 return response
1670 1671
1671 def __stream(self, endpoint, listener, params={}, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1672 def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1672 """ 1673 """
1673 Internal streaming API helper. 1674 Internal streaming API helper.
1674 1675
@@ -1696,19 +1697,29 @@ class Mastodon:
1696 # The streaming server can't handle two slashes in a path, so remove trailing slashes 1697 # The streaming server can't handle two slashes in a path, so remove trailing slashes
1697 if url[-1] == '/': 1698 if url[-1] == '/':
1698 url = url[:-1] 1699 url = url[:-1]
1699 1700
1700 headers = {"Authorization": "Bearer " + self.access_token} 1701 # Connect function (called and then potentially passed to async handler)
1701 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True, 1702 def connect_func():
1703 headers = {"Authorization": "Bearer " + self.access_token}
1704 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True,
1702 timeout=(self.request_timeout, timeout)) 1705 timeout=(self.request_timeout, timeout))
1703 1706
1704 if connection.status_code != 200: 1707 if connection.status_code != 200:
1705 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) 1708 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
1706 1709 return connection
1710 connection = connect_func()
1711
1712 # Async stream handler
1707 class __stream_handle(): 1713 class __stream_handle():
1708 def __init__(self, connection): 1714 def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
1709 self.closed = False 1715 self.closed = False
1716 self.running = True
1710 self.connection = connection 1717 self.connection = connection
1711 1718 self.connect_func = connect_func
1719 self.reconnect_async = reconnect_async
1720 self.reconnect_async_wait_sec = reconnect_async_wait_sec
1721 self.reconnecting = False
1722
1712 def close(self): 1723 def close(self):
1713 self.closed = True 1724 self.closed = True
1714 self.connection.close() 1725 self.connection.close()
@@ -1716,19 +1727,48 @@ class Mastodon:
1716 def is_alive(self): 1727 def is_alive(self):
1717 return self._thread.is_alive() 1728 return self._thread.is_alive()
1718 1729
1730 def is_receiving(self):
1731 if self.closed or not self.running or self.reconnecting or not self.is_alive():
1732 return False
1733 else:
1734 return True
1735
1719 def _threadproc(self): 1736 def _threadproc(self):
1720 self._thread = threading.current_thread() 1737 self._thread = threading.current_thread()
1721 with closing(connection) as r: 1738
1722 try: 1739 # Run until closed or until error if not autoreconnecting
1723 listener.handle_stream(r) 1740 while self.running:
1724 except AttributeError as e: 1741 with closing(self.connection) as r:
1725 if not self.closed: 1742 try:
1726 raise e 1743 listener.handle_stream(r)
1744 except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
1745 if not (self.closed or self.reconnect_async):
1746 raise e
1747 else:
1748 if self.closed:
1749 self.running = False
1750
1751 # Reconnect loop. Try immediately once, then with delays on error.
1752 if self.reconnect_async and not self.closed:
1753 self.reconnecting = True
1754 connect_success = False
1755 while not connect_success:
1756 connect_success = True
1757 try:
1758 self.connection = self.connect_func()
1759 if self.connection.status_code != 200:
1760 time.sleep(self.reconnect_async_wait_sec)
1761 connect_success = False
1762 except:
1763 time.sleep(self.reconnect_async_wait_sec)
1764 connect_success = False
1765 self.reconnecting = False
1766 else:
1767 self.running = False
1727 return 0 1768 return 0
1728 1769
1729 handle = __stream_handle(connection) 1770 if run_async:
1730 1771 handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec)
1731 if async:
1732 t = threading.Thread(args=(), daemon = True, target=handle._threadproc) 1772 t = threading.Thread(args=(), daemon = True, target=handle._threadproc)
1733 t.start() 1773 t.start()
1734 return handle 1774 return handle
diff --git a/mastodon/__init__.py b/mastodon/__init__.py
index fdf776d..787d4e8 100644
--- a/mastodon/__init__.py
+++ b/mastodon/__init__.py
@@ -1,4 +1,4 @@
1from mastodon.Mastodon import Mastodon 1from mastodon.Mastodon import Mastodon, MastodonError, MastodonVersionError, MastodonIllegalArgumentError, MastodonIOError, MastodonFileNotFoundError, MastodonNetworkError, MastodonAPIError, MastodonNotFoundError, MastodonUnauthorizedError, MastodonRatelimitError, MastodonMalformedEventError
2from mastodon.streaming import StreamListener, CallbackStreamListener 2from mastodon.streaming import StreamListener, CallbackStreamListener
3 3
4__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener'] 4__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener', 'MastodonError', 'MastodonVersionError', 'MastodonIllegalArgumentError', 'MastodonIOError', 'MastodonFileNotFoundError', 'MastodonNetworkError', 'MastodonAPIError', 'MastodonNotFoundError', 'MastodonUnauthorizedError', 'MastodonRatelimitError', 'MastodonMalformedEventError']
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
index d55ad54..1c73f48 100644
--- a/mastodon/streaming.py
+++ b/mastodon/streaming.py
@@ -6,7 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
6import json 6import json
7import six 7import six
8from mastodon import Mastodon 8from mastodon import Mastodon
9from mastodon.Mastodon import MastodonMalformedEventError 9from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError
10from requests.exceptions import ChunkedEncodingError
10 11
11class StreamListener(object): 12class StreamListener(object):
12 """Callbacks for the streaming API. Create a subclass, override the on_xxx 13 """Callbacks for the streaming API. Create a subclass, override the on_xxx
@@ -43,25 +44,31 @@ class StreamListener(object):
43 """ 44 """
44 event = {} 45 event = {}
45 line_buffer = bytearray() 46 line_buffer = bytearray()
46 for chunk in response.iter_content(chunk_size = 1): 47 try:
47 if chunk: 48 for chunk in response.iter_content(chunk_size = 1):
48 if chunk == b'\n': 49 if chunk:
49 try: 50 if chunk == b'\n':
50 line = line_buffer.decode('utf-8') 51 try:
51 except UnicodeDecodeError as err: 52 line = line_buffer.decode('utf-8')
52 six.raise_from( 53 except UnicodeDecodeError as err:
53 MastodonMalformedEventError("Malformed UTF-8"), 54 six.raise_from(
54 err 55 MastodonMalformedEventError("Malformed UTF-8"),
55 ) 56 err
56 if line == '': 57 )
57 self._dispatch(event) 58 if line == '':
58 event = {} 59 self._dispatch(event)
60 event = {}
61 else:
62 event = self._parse_line(line, event)
63 line_buffer = bytearray()
59 else: 64 else:
60 event = self._parse_line(line, event) 65 line_buffer.extend(chunk)
61 line_buffer = bytearray() 66 except ChunkedEncodingError as err:
62 else: 67 six.raise_from(
63 line_buffer.extend(chunk) 68 MastodonNetworkError("Server ceased communication."),
64 69 err
70 )
71
65 def _parse_line(self, line, event): 72 def _parse_line(self, line, event):
66 if line.startswith(':'): 73 if line.startswith(':'):
67 self.handle_heartbeat() 74 self.handle_heartbeat()
@@ -102,7 +109,6 @@ class StreamListener(object):
102 err 109 err
103 ) 110 )
104 else: 111 else:
105 # TODO: allow handlers to return/raise to stop streaming cleanly
106 handler(payload) 112 handler(payload)
107 113
108class CallbackStreamListener(StreamListener): 114class CallbackStreamListener(StreamListener):
Powered by cgit v1.2.3 (git 2.41.0)