aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenz Diener <[email protected]>2018-05-06 15:58:37 +0200
committerGitHub <[email protected]>2018-05-06 15:58:37 +0200
commit1a62b6a5a65179fc6e098b8029078197e114fc98 (patch)
tree98328e8776447334b19592992016d9b7f16ae98b /mastodon
parentfbd4122fec092bff6b1cc9f44dfeda6ee693c41b (diff)
parentca0ea36c6edd58dc15e5fd7f31f24ba5097d6e8d (diff)
downloadmastodon.py-1a62b6a5a65179fc6e098b8029078197e114fc98.tar.gz
Merge pull request #128 from codl/stream-timeout
add timeouts to streams
Diffstat (limited to 'mastodon')
-rw-r--r--mastodon/Mastodon.py31
-rw-r--r--mastodon/streaming.py11
2 files changed, 27 insertions, 15 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index 6ab9291..2e620ba 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -89,6 +89,7 @@ class Mastodon:
89 """ 89 """
90 __DEFAULT_BASE_URL = 'https://mastodon.social' 90 __DEFAULT_BASE_URL = 'https://mastodon.social'
91 __DEFAULT_TIMEOUT = 300 91 __DEFAULT_TIMEOUT = 300
92 __DEFAULT_STREAM_TIMEOUT = 300
92 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 93 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
93 __SUPPORTED_MASTODON_VERSION = "2.3.0" 94 __SUPPORTED_MASTODON_VERSION = "2.3.0"
94 95
@@ -1430,45 +1431,45 @@ class Mastodon:
1430 # Streaming 1431 # Streaming
1431 ### 1432 ###
1432 @api_version("1.1.0", "1.4.2") 1433 @api_version("1.1.0", "1.4.2")
1433 def stream_user(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): 1434 def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1434 """ 1435 """
1435 Streams events that are relevant to the authorized user, i.e. home 1436 Streams events that are relevant to the authorized user, i.e. home
1436 timeline and notifications. 1437 timeline and notifications.
1437 """ 1438 """
1438 return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) 1439 return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1439 1440
1440 @api_version("1.1.0", "1.4.2") 1441 @api_version("1.1.0", "1.4.2")
1441 def stream_public(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): 1442 def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1442 """ 1443 """
1443 Streams public events. 1444 Streams public events.
1444 """ 1445 """
1445 return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) 1446 return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1446 1447
1447 @api_version("1.1.0", "1.4.2") 1448 @api_version("1.1.0", "1.4.2")
1448 def stream_local(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): 1449 def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1449 """ 1450 """
1450 Streams local public events. 1451 Streams local public events.
1451 """ 1452 """
1452 return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) 1453 return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1453 1454
1454 @api_version("1.1.0", "1.4.2") 1455 @api_version("1.1.0", "1.4.2")
1455 def stream_hashtag(self, tag, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): 1456 def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1456 """ 1457 """
1457 Stream for all public statuses for the hashtag 'tag' seen by the connected 1458 Stream for all public statuses for the hashtag 'tag' seen by the connected
1458 instance. 1459 instance.
1459 """ 1460 """
1460 if tag.startswith("#"): 1461 if tag.startswith("#"):
1461 raise MastodonIllegalArgumentError("Tag parameter should omit leading #") 1462 raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
1462 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) 1463 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1463 1464
1464 @api_version("2.1.0", "2.1.0") 1465 @api_version("2.1.0", "2.1.0")
1465 def stream_list(self, id, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): 1466 def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1466 """ 1467 """
1467 Stream events for the current user, restricted to accounts on the given 1468 Stream events for the current user, restricted to accounts on the given
1468 list. 1469 list.
1469 """ 1470 """
1470 id = self.__unpack_id(id) 1471 id = self.__unpack_id(id)
1471 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) 1472 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1472 1473
1473 ### 1474 ###
1474 # Internal helpers, dragons probably 1475 # Internal helpers, dragons probably
@@ -1710,7 +1711,7 @@ class Mastodon:
1710 1711
1711 return response 1712 return response
1712 1713
1713 def __stream(self, endpoint, listener, params={}, run_async=False, reconnect_async=False, reconnect_async_wait_sec=5): 1714 def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1714 """ 1715 """
1715 Internal streaming API helper. 1716 Internal streaming API helper.
1716 1717
@@ -1742,7 +1743,9 @@ class Mastodon:
1742 # Connect function (called and then potentially passed to async handler) 1743 # Connect function (called and then potentially passed to async handler)
1743 def connect_func(): 1744 def connect_func():
1744 headers = {"Authorization": "Bearer " + self.access_token} 1745 headers = {"Authorization": "Bearer " + self.access_token}
1745 connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True) 1746 connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True,
1747 timeout=(self.request_timeout, timeout))
1748
1746 if connection.status_code != 200: 1749 if connection.status_code != 200:
1747 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) 1750 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
1748 return connection 1751 return connection
@@ -1912,6 +1915,10 @@ class MastodonNetworkError(MastodonIOError):
1912 """Raised when network communication with the server fails""" 1915 """Raised when network communication with the server fails"""
1913 pass 1916 pass
1914 1917
1918class MastodonReadTimeout(MastodonNetworkError):
1919 """Raised when a stream times out"""
1920 pass
1921
1915 1922
1916class MastodonAPIError(MastodonError): 1923class MastodonAPIError(MastodonError):
1917 """Raised when the mastodon API generates a response that cannot be handled""" 1924 """Raised when the mastodon API generates a response that cannot be handled"""
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
index 1c73f48..3fbd569 100644
--- a/mastodon/streaming.py
+++ b/mastodon/streaming.py
@@ -6,8 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
6import json 6import json
7import six 7import six
8from mastodon import Mastodon 8from mastodon import Mastodon
9from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError 9from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout
10from requests.exceptions import ChunkedEncodingError 10from requests.exceptions import ChunkedEncodingError, ReadTimeout
11 11
12class StreamListener(object): 12class StreamListener(object):
13 """Callbacks for the streaming API. Create a subclass, override the on_xxx 13 """Callbacks for the streaming API. Create a subclass, override the on_xxx
@@ -68,7 +68,12 @@ class StreamListener(object):
68 MastodonNetworkError("Server ceased communication."), 68 MastodonNetworkError("Server ceased communication."),
69 err 69 err
70 ) 70 )
71 71 except MastodonReadTimeout as err:
72 six.raise_from(
73 MastodonReadTimeout("Timed out while reading from server."),
74 err
75 )
76
72 def _parse_line(self, line, event): 77 def _parse_line(self, line, event):
73 if line.startswith(':'): 78 if line.startswith(':'):
74 self.handle_heartbeat() 79 self.handle_heartbeat()
Powered by cgit v1.2.3 (git 2.41.0)