heartbeats/common/update_playlists.py

112 lines
3.6 KiB
Python

import requests as rq
import json
from datetime import datetime, timedelta
COUNTTYPE_DAYS = 'days'
COUNTTYPE_TRACKS = 'tracks'
class PlaylistUpdater:
def __init__(self, db, auth_token, spotify_api_endpoint='https://api.spotify.com/v1'):
self._db = db
self._tracks = []
self._auth_token = auth_token
self._spotify_api_endpoint = spotify_api_endpoint
self._update_tracks()
def _update_tracks(self):
self._tracks = []
headers = { 'Authorization': f"Bearer {self._auth_token}" }
rnext = f"{self._spotify_api_endpoint}/me/tracks?limit=50"
while rnext is not None:
r = rq.get(rnext, headers = headers).json()
self._tracks += r['items']
rnext = r['next']
def update_playlist(self, playlist_id):
pltracks = []
newtracklist = []
count = self._db.get_user_playlist_count(playlist_id)
counttype = self._db.get_user_playlist_counttype(playlist_id)
headers = { 'Authorization': f"Bearer {self._auth_token}" }
added_at_dict = {}
r = rq.get(f"{self._spotify_api_endpoint}/playlists/{playlist_id}?fields=tracks,snapshot_id", headers = headers).json()
snapshot_id = r['snapshot_id']
pltracks = [t['track']['id'] for t in r['tracks']['items']]
rnext = r['tracks']['next']
while rnext is not None:
r = rq.get(rnext, headers = headers).json()
for track in r['items']:
pltracks.append(track['track']['id'])
rnext = r['next']
for i, track in enumerate(self._tracks):
# for some reason, python datetime doesn't support utc zulu timezone
added_at = datetime.fromisoformat(track['added_at'].replace('Z', '+00:00')).timestamp()
added_at_dict[track['track']['id']] = added_at
if counttype == COUNTTYPE_DAYS:
if added_at < (datetime.now() - timedelta(days=count)).timestamp():
break
if counttype == COUNTTYPE_TRACKS and i >= count:
break
newtracklist.append(track['track']['id'])
pltracks = set(pltracks)
newtracklist = set(newtracklist)
toremove = list(pltracks - newtracklist)
toadd = sorted(list(newtracklist - pltracks), key = lambda t: added_at_dict[t])
print(f"{pltracks=}")
print(f"{newtracklist=}")
print(f"{toremove=}")
print(f"{toadd=}")
print(f"{count=}")
print(f"{counttype=}")
print(f"{COUNTTYPE_TRACKS=}")
print(f"{counttype==COUNTTYPE_TRACKS=}")
while toremove:
tracks = []
for i in range(100):
if not toremove:
break
tracks.append({
'uri': f"spotify:track:{toremove.pop()}"
})
r = rq.delete(
f"{self._spotify_api_endpoint}/playlists/{playlist_id}/tracks",
headers = headers,
json = {
'tracks': tracks,
'snapshot_id': snapshot_id
}
).json()
print(f"{json.dumps(r, indent=2)=}")
while toadd:
tracks = []
for i in range(100):
if not toadd:
break
tracks.append(f"spotify:track:{toadd.pop()}")
r = rq.post(
f"{self._spotify_api_endpoint}/playlists/{playlist_id}/tracks",
headers = headers,
json = {
'uris': tracks,
'position': 0
}
)
print(f"{r.text=}")