import requests as rq import json from datetime import datetime, timedelta COUNTTYPE_DAYS = 'days' COUNTTYPE_TRACKS = 'tracks' class PlaylistUpdater: def __init__(self, db, auth_token, spotify_api_endpoint='https://api.spotify.com/v1'): self._db = db self._tracks = [] self._auth_token = auth_token self._spotify_api_endpoint = spotify_api_endpoint self._update_tracks() def _update_tracks(self): self._tracks = [] rnext = f"{self._spotify_api_endpoint}/me/tracks?limit=50" while rnext is not None: r = self._fetch(rnext); self._tracks += r['items'] rnext = r['next'] def _fetch(self, url): headers = { 'Authorization': f"Bearer {self._auth_token}" } while True: r = rq.get(f"{url}", headers=headers) if 200 <= r.status_code <= 299: return r.json() # slow down if spotify is applying rate limiting if r.status_code == 429: time.sleep(30) # don't try again in a crazy fast loop to avoid rate limits time.sleep(1) def update_playlist(self, playlist_id): pltracks = [] newtracklist = [] count = self._db.get_user_playlist_count(playlist_id) counttype = self._db.get_user_playlist_counttype(playlist_id) headers = { 'Authorization': f"Bearer {self._auth_token}" } added_at_dict = {} r = self._fetch(f"{self._spotify_api_endpoint}/playlists/{playlist_id}?fields=tracks,snapshot_id") snapshot_id = r['snapshot_id'] pltracks = [t['track']['id'] for t in r['tracks']['items']] rnext = r['tracks']['next'] while rnext is not None: r = self._fetch(rnext) for track in r['items']: pltracks.append(track['track']['id']) rnext = r['next'] for i, track in enumerate(self._tracks): # for some reason, python datetime doesn't support utc zulu timezone added_at = datetime.fromisoformat(track['added_at'].replace('Z', '+00:00')).timestamp() added_at_dict[track['track']['id']] = added_at if counttype == COUNTTYPE_DAYS: if added_at < (datetime.now() - timedelta(days=count)).timestamp(): break if counttype == COUNTTYPE_TRACKS and i >= count: break newtracklist.append(track['track']['id']) pltracks = set(pltracks) newtracklist = set(newtracklist) toremove = list(pltracks - newtracklist) toadd = sorted(list(newtracklist - pltracks), key = lambda t: added_at_dict[t]) print(f"{pltracks=}") print(f"{newtracklist=}") print(f"{toremove=}") print(f"{toadd=}") print(f"{count=}") print(f"{counttype=}") print(f"{COUNTTYPE_TRACKS=}") print(f"{counttype==COUNTTYPE_TRACKS=}") while toremove: tracks = [] for i in range(100): if not toremove: break tracks.append({ 'uri': f"spotify:track:{toremove.pop()}" }) r = rq.delete( f"{self._spotify_api_endpoint}/playlists/{playlist_id}/tracks", headers = headers, json = { 'tracks': tracks, 'snapshot_id': snapshot_id } ).json() print(f"{json.dumps(r, indent=2)=}") while toadd: tracks = [] for i in range(100): if not toadd: break tracks.append(f"spotify:track:{toadd.pop()}") r = rq.post( f"{self._spotify_api_endpoint}/playlists/{playlist_id}/tracks", headers = headers, json = { 'uris': tracks, 'position': 0 } ) print(f"{r.text=}")