33from __future__ import annotations
44
55import datetime
6+ import time
7+ from collections import Counter
68from typing import TYPE_CHECKING , ClassVar
79
810import requests
@@ -40,32 +42,71 @@ def commands(self):
4042 lbupdate_cmd = ui .Subcommand (
4143 "lbimport" , help = "Import ListenBrainz history"
4244 )
45+ lbupdate_cmd .parser .add_option (
46+ "--max" ,
47+ dest = "max_listens" ,
48+ type = "int" ,
49+ default = None ,
50+ help = "maximum number of listens to fetch (default: all)" ,
51+ )
4352
4453 def func (lib , opts , args ):
45- self ._lbupdate (lib , self ._log )
54+ self ._lbupdate (lib , self ._log , max_listens = opts . max_listens )
4655
4756 lbupdate_cmd .func = func
4857 return [lbupdate_cmd ]
4958
50- def _lbupdate (self , lib , log ):
51- """Obtain view count from Listenbrainz ."""
52- found_total = 0
53- unknown_total = 0
54- ls = self . get_listens ( )
55- tracks = self . get_tracks_from_listens ( ls )
56- log . info ( "Found {} listens" , len ( ls ))
57- if tracks :
58- found , unknown = update_play_counts (
59- lib , tracks , log , "listenbrainz"
60- )
61- found_total += found
62- unknown_total += unknown
59+ def _lbupdate (self , lib , log , max_listens = None ):
60+ """Obtain play counts from ListenBrainz ."""
61+ listens = self . get_listens ( max_total = max_listens )
62+ if listens is None :
63+ log . error ( "Failed to fetch listens from ListenBrainz." )
64+ return
65+ if not listens :
66+ log . info ( "No listens found." )
67+ return
68+ log . info ( "Found {} listens" , len ( listens ))
69+ tracks = self . _aggregate_listens ( self . get_tracks_from_listens ( listens ) )
70+ log . info ( "Aggregated into {} unique tracks" , len ( tracks ))
71+ found , unknown = update_play_counts ( lib , tracks , log , "listenbrainz" )
6372 log .info ("... done!" )
64- log .info ("{} unknown play-counts" , unknown_total )
65- log .info ("{} play-counts imported" , found_total )
73+ log .info ("{} unknown play-counts" , unknown )
74+ log .info ("{} play-counts imported" , found )
75+
76+ @staticmethod
77+ def _aggregate_listens (tracks : list [Track ]) -> list [Track ]:
78+ """Aggregate individual listen events into per-track play counts.
79+
80+ ListenBrainz returns individual listen events (each with playcount=1).
81+ We aggregate them by track identity so each unique track gets its total
82+ count, making the import idempotent.
83+ """
84+ _agg_key = str | tuple [str , str , str ]
85+ play_counts : Counter [_agg_key ] = Counter ()
86+ track_info : dict [_agg_key , Track ] = {}
87+ for t in tracks :
88+ mbid = t .get ("mbid" ) or ""
89+ artist = t ["artist" ]
90+ name = t ["name" ]
91+ album = t .get ("album" ) or ""
92+
93+ key : _agg_key = mbid if mbid else (artist , name , album )
94+ play_counts [key ] += 1
95+ if key not in track_info :
96+ track_info [key ] = t
97+
98+ return [
99+ {** info , "playcount" : play_counts [key ]}
100+ for key , info in track_info .items ()
101+ ]
66102
67103 def _make_request (self , url , params = None ):
68- """Makes a request to the ListenBrainz API."""
104+ """Makes a request to the ListenBrainz API.
105+
106+ Respects the X-RateLimit-* headers returned by the server: if the
107+ remaining quota drops to zero, sleeps until the window resets before
108+ returning, so the next call is guaranteed a fresh quota.
109+ """
69110 try :
70111 response = requests .get (
71112 url = url ,
@@ -74,47 +115,81 @@ def _make_request(self, url, params=None):
74115 params = params ,
75116 )
76117 response .raise_for_status ()
118+ remaining = response .headers .get ("X-RateLimit-Remaining" )
119+ reset_in = response .headers .get ("X-RateLimit-Reset-In" )
120+ if remaining is not None and int (remaining ) == 0 and reset_in :
121+ self ._log .debug (
122+ "ListenBrainz rate limit reached; sleeping {}s" , reset_in
123+ )
124+ time .sleep (int (reset_in ) + 1 )
77125 return response .json ()
78126 except requests .exceptions .RequestException as e :
79127 self ._log .debug ("Invalid Search Error: {}" , e )
80128 return None
81129
82- def get_listens (self , min_ts = None , max_ts = None , count = None ):
130+ def get_listens (self , min_ts = None , max_ts = None , count = None , max_total = None ):
83131 """Gets the listen history of a given user.
84132
133+ Paginates through all available listens using the max_ts parameter.
134+
85135 Args:
86- username: User to get listen history of.
87136 min_ts: History before this timestamp will not be returned.
88137 DO NOT USE WITH max_ts.
89138 max_ts: History after this timestamp will not be returned.
90139 DO NOT USE WITH min_ts.
91- count: How many listens to return. If not specified,
92- uses a default from the server .
140+ count: How many listens to return per page (max 1000).
141+ max_total: Stop after fetching this many listens in total .
93142
94143 Returns:
95- A list of listen info dictionaries if there's an OK status.
96-
97- Raises:
98- An HTTPError if there's a failure.
99- A ValueError if the JSON in the response is invalid.
100- An IndexError if the JSON is not structured as expected.
144+ A list of listen info dictionaries, or None on API failure.
101145 """
146+ if min_ts is not None and max_ts is not None :
147+ raise ValueError ("min_ts and max_ts are mutually exclusive." )
148+
149+ per_page = min (count or 1000 , 1000 )
102150 url = f"{ self .ROOT } /user/{ self .username } /listens"
103- params = {
104- k : v
105- for k , v in {
106- "min_ts" : min_ts ,
107- "max_ts" : max_ts ,
108- "count" : count ,
109- }.items ()
110- if v is not None
111- }
112- response = self ._make_request (url , params )
113-
114- if response is not None :
115- return response ["payload" ]["listens" ]
116- else :
117- return None
151+ all_listens = []
152+
153+ while True :
154+ if max_total is not None :
155+ remaining_needed = max_total - len (all_listens )
156+ if remaining_needed <= 0 :
157+ break
158+ page_size = min (per_page , remaining_needed )
159+ else :
160+ page_size = per_page
161+
162+ params = {"count" : page_size }
163+ if max_ts is not None :
164+ params ["max_ts" ] = max_ts
165+ if min_ts is not None :
166+ params ["min_ts" ] = min_ts
167+
168+ response = self ._make_request (url , params )
169+ if response is None :
170+ if not all_listens :
171+ return None
172+ break
173+
174+ listens = response ["payload" ]["listens" ]
175+ if not listens :
176+ break
177+
178+ all_listens .extend (listens )
179+ self ._log .info ("Fetched {} listens so far..." , len (all_listens ))
180+
181+ # If we got fewer than requested, we've reached the end
182+ if len (listens ) < page_size :
183+ break
184+
185+ # Paginate using the oldest listen's timestamp.
186+ # Subtract 1 to avoid re-fetching listens at the boundary.
187+ new_max_ts = listens [- 1 ]["listened_at" ] - 1
188+ if max_ts is not None and new_max_ts >= max_ts :
189+ break
190+ max_ts = new_max_ts
191+
192+ return all_listens
118193
119194 def get_tracks_from_listens (self , listens ) -> list [Track ]:
120195 """Returns a list of tracks from a list of listens."""
@@ -123,10 +198,7 @@ def get_tracks_from_listens(self, listens) -> list[Track]:
123198 if track ["track_metadata" ].get ("release_name" ) is None :
124199 continue
125200 mbid_mapping = track ["track_metadata" ].get ("mbid_mapping" , {})
126- mbid = None
127- if mbid_mapping .get ("recording_mbid" ) is None :
128- # search for the track using title and release
129- mbid = self .get_mb_recording_id (track )
201+ mbid = mbid_mapping .get ("recording_mbid" )
130202 tracks .append (
131203 {
132204 "album" : (
0 commit comments