aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenz Diener <[email protected]>2017-09-08 15:03:15 +0200
committerGitHub <[email protected]>2017-09-08 15:03:15 +0200
commit4fbeb7245f690cf46f8c761a85c55341159b82b9 (patch)
tree4d93477df1050df501196a217ba0ac7f84e3f506 /mastodon
parent0edc424b11a14ce2908ac16739b910febdd1e140 (diff)
parent7de02fe5b8da76fc3d8f6aa063996433119a2214 (diff)
downloadmastodon.py-4fbeb7245f690cf46f8c761a85c55341159b82b9.tar.gz
Merge pull request #69 from Chronister/async_streaming
Add async parameter to streaming API calls.
Diffstat (limited to 'mastodon')
-rw-r--r--mastodon/Mastodon.py98
1 files changed, 70 insertions, 28 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index f073a66..a74f59a 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -15,6 +15,7 @@ import dateutil
15import dateutil.parser 15import dateutil.parser
16import re 16import re
17import copy 17import copy
18import threading
18 19
19 20
20class Mastodon: 21class Mastodon:
@@ -110,9 +111,9 @@ class Mastodon:
110 self._token_expired = datetime.datetime.now() 111 self._token_expired = datetime.datetime.now()
111 self._refresh_token = None 112 self._refresh_token = None
112 113
113 self.ratelimit_limit = 300 114 self.ratelimit_limit = 150
114 self.ratelimit_reset = time.time() 115 self.ratelimit_reset = time.time()
115 self.ratelimit_remaining = 300 116 self.ratelimit_remaining = 150
116 self.ratelimit_lastcall = time.time() 117 self.ratelimit_lastcall = time.time()
117 self.ratelimit_pacefactor = ratelimit_pacefactor 118 self.ratelimit_pacefactor = ratelimit_pacefactor
118 119
@@ -864,47 +865,59 @@ class Mastodon:
864 ### 865 ###
865 # Streaming 866 # Streaming
866 ### 867 ###
867 def user_stream(self, listener): 868 def user_stream(self, listener, async=False):
868 """ 869 """
869 Streams events that are relevant to the authorized user, i.e. home 870 Streams events that are relevant to the authorized user, i.e. home
870 timeline and notifications. 'listener' should be a subclass of 871 timeline and notifications. 'listener' should be a subclass of
871 StreamListener. 872 StreamListener which will receive callbacks for incoming events.
872 873
873 This method blocks forever, calling callbacks on 'listener' for 874 If async is False, this method blocks forever.
874 incoming events. 875
876 If async is True, 'listener' will listen on another thread and this method
877 will return a handle corresponding to the open connection. The
878 connection may be closed at any time by calling its close() method.
875 """ 879 """
876 return self.__stream('/api/v1/streaming/user', listener) 880 return self.__stream('/api/v1/streaming/user', listener, async=async)
877 881
878 def public_stream(self, listener): 882 def public_stream(self, listener, async=False):
879 """ 883 """
880 Streams public events. 'listener' should be a subclass of 884 Streams public events. 'listener' should be a subclass of StreamListener
881 StreamListener. 885 which will receive callbacks for incoming events.
886
887 If async is False, this method blocks forever.
882 888
883 This method blocks forever, calling callbacks on 'listener' for 889 If async is True, 'listener' will listen on another thread and this method
884 incoming events. 890 will return a handle corresponding to the open connection. The
891 connection may be closed at any time by calling its close() method.
885 """ 892 """
886 return self.__stream('/api/v1/streaming/public', listener) 893 return self.__stream('/api/v1/streaming/public', listener, async=async)
887 894
888 def local_stream(self, listener): 895 def local_stream(self, listener, async=False):
889 """ 896 """
890 Streams local events. 'listener' should be a subclass of 897 Streams local events. 'listener' should be a subclass of StreamListener
891 StreamListener. 898 which will receive callbacks for incoming events.
892 899
893 This method blocks forever, calling callbacks on 'listener' for 900 If async is False, this method blocks forever.
894 incoming events. 901
902 If async is True, 'listener' will listen on another thread and this method
903 will return a handle corresponding to the open connection. The
904 connection may be closed at any time by calling its close() method.
895 """ 905 """
896 return self.__stream('/api/v1/streaming/public/local', listener) 906 return self.__stream('/api/v1/streaming/public/local', listener, async=async)
897 907
898 def hashtag_stream(self, tag, listener): 908 def hashtag_stream(self, tag, listener, async=False):
899 """ 909 """
900 Returns all public statuses for the hashtag 'tag'. 'listener' should be 910 Returns all public statuses for the hashtag 'tag'. 'listener' should be
901 a subclass of StreamListener. 911 a subclass of StreamListener which will receive callbacks for incoming
912 events.
913
914 If async is False, this method blocks forever.
902 915
903 This method blocks forever, calling callbacks on 'listener' for 916 If async is True, 'listener' will listen on another thread and this method
904 incoming events. 917 will return a handle corresponding to the open connection. The
918 connection may be closed at any time by calling its close() method.
905 """ 919 """
906 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener) 920 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener)
907
908 ### 921 ###
909 # Internal helpers, dragons probably 922 # Internal helpers, dragons probably
910 ### 923 ###
@@ -1080,18 +1093,47 @@ class Mastodon:
1080 1093
1081 return response 1094 return response
1082 1095
1083 def __stream(self, endpoint, listener, params={}): 1096 def __stream(self, endpoint, listener, params={}, async=False):
1084 """ 1097 """
1085 Internal streaming API helper. 1098 Internal streaming API helper.
1099
1100 Returns a handle to the open connection that the user can close if they
1101 wish to terminate it.
1086 """ 1102 """
1087 1103
1088 headers = {} 1104 headers = {}
1089 if self.access_token is not None: 1105 if self.access_token is not None:
1090 headers = {'Authorization': 'Bearer ' + self.access_token} 1106 headers = {'Authorization': 'Bearer ' + self.access_token}
1091
1092 url = self.api_base_url + endpoint 1107 url = self.api_base_url + endpoint
1093 with closing(requests.get(url, headers=headers, data=params, stream=True)) as r: 1108
1094 listener.handle_stream(r.iter_lines()) 1109 connection = requests.get(url, headers = headers, data = params, stream = True)
1110
1111 class __stream_handle():
1112 def __init__(self, connection):
1113 self.connection = connection
1114
1115 def close(self):
1116 self.connection.close()
1117
1118 def _threadproc(self):
1119 with closing(connection) as r:
1120 try:
1121 listener.handle_stream(r.iter_lines())
1122 except AttributeError as e:
1123 # Eat AttributeError from requests if user closes early
1124 pass
1125 return 0
1126
1127 handle = __stream_handle(connection)
1128
1129 if async:
1130 t = threading.Thread(args=(), target=handle._threadproc)
1131 t.start()
1132 return handle
1133 else:
1134 # Blocking, never returns (can only leave via exception)
1135 with closing(connection) as r:
1136 listener.handle_stream(r.iter_lines())
1095 1137
1096 def __generate_params(self, params, exclude=[]): 1138 def __generate_params(self, params, exclude=[]):
1097 """ 1139 """
Powered by cgit v1.2.3 (git 2.41.0)