257 lines
8.7 KiB
Python
257 lines
8.7 KiB
Python
import os
|
|
from flask import Flask, render_template, request, make_response, redirect, url_for
|
|
import requests as rq
|
|
from requests_futures.sessions import FuturesSession
|
|
import urllib.parse
|
|
import base64
|
|
from datetime import date, datetime
|
|
import time
|
|
import json
|
|
|
|
from common import Db, PlaylistUpdater
|
|
import uuid
|
|
|
|
app = Flask(__name__)
|
|
|
|
USER_SECRET_EXPIRATION = 31557600
|
|
|
|
URL_PREFIX = os.environ.get('URL_PREFIX', '/')
|
|
#app.config.update(APPLICATION_ROOT=URL_PREFIX)
|
|
CLIENT_ID = os.environ.get('CLIENT_ID', None)
|
|
if CLIENT_ID is None:
|
|
raise ValueError("CLIENT_ID cannot be None, set using environment variable")
|
|
CLIENT_SECRET = os.environ.get('CLIENT_SECRET', None)
|
|
if CLIENT_SECRET is None:
|
|
raise ValueError("CLIENT_SECRET cannot be None, set using environment variable")
|
|
|
|
BASE_URL = os.environ.get('BASE_URL', None)
|
|
if BASE_URL is None:
|
|
raise ValueError("BASE_URL cannot be None, set using environtment variable")
|
|
|
|
db = Db('redis', 6379, 0, CLIENT_ID, CLIENT_SECRET)
|
|
|
|
SPOTIFY_AUTHORIZE_ENDPOINT = os.environ.get('SPOTIFY_AUTHORIZE_ENDPOINT', 'https://accounts.spotify.com/authorize?')
|
|
SPOTIFY_TOKEN_ENDPOINT = os.environ.get('SPOTIFY_AUTHORIZE_ENDPOINT', 'https://accounts.spotify.com/api/token')
|
|
SPOTIFY_API_ENDPOINT = os.environ.get('SPOTIFY_API_ENDPOINT', 'https://api.spotify.com/v1')
|
|
|
|
SCOPES = ' '.join([
|
|
'playlist-read-private',
|
|
'playlist-read-collaborative',
|
|
'playlist-modify-private',
|
|
'playlist-modify-public',
|
|
'user-read-email',
|
|
'user-library-read',
|
|
'user-library-modify'
|
|
])
|
|
|
|
|
|
def isauth(cookies):
|
|
user_id = cookies.get('spotify_id')
|
|
user_secret = cookies.get('user_secret')
|
|
auth_token = db.get_user_auth_token(user_id)
|
|
|
|
return user_secret == db.get_user_client_secret(user_id)
|
|
|
|
def create_empty_message():
|
|
return { 'type':'', 'text': u'\u00A0'}
|
|
|
|
|
|
@app.route("/", methods=['GET', 'POST'])
|
|
def index():
|
|
user_id = request.cookies.get('spotify_id')
|
|
user_secret = request.cookies.get('user_secret')
|
|
auth_token = db.get_user_auth_token(user_id)
|
|
|
|
if (not auth_token) or (user_secret != db.get_user_client_secret(user_id)):
|
|
resp = make_response(render_template(
|
|
"index_unauthorised.html",
|
|
loginurl = SPOTIFY_AUTHORIZE_ENDPOINT + urllib.parse.urlencode({
|
|
'client_id': CLIENT_ID,
|
|
'scope': SCOPES,
|
|
'redirect_uri': BASE_URL + url_for('cb'),
|
|
'response_type': 'code'
|
|
}),
|
|
message = create_empty_message()
|
|
))
|
|
resp.set_cookie(
|
|
'user_secret',
|
|
str(uuid.uuid4()),
|
|
secure = True,
|
|
expires = time.time() + USER_SECRET_EXPIRATION
|
|
)
|
|
|
|
return resp
|
|
|
|
|
|
if request.method=='GET':
|
|
message = request.args.get('message')
|
|
return render_template("index_authorised.html", message = json.loads(message) if message else create_empty_message())
|
|
|
|
|
|
playlist_name = request.form.get('pname')
|
|
count = request.form.get('count')
|
|
public = request.form.get('public') == 'public'
|
|
counttype = request.form.get('counttype')
|
|
|
|
create_rq = rq.post(f"{SPOTIFY_API_ENDPOINT}/users/{user_id}/playlists",
|
|
headers = { 'Authorization': f"Bearer {auth_token}" },
|
|
json = {
|
|
'name': playlist_name,
|
|
'public': public,
|
|
'collaborative': False,
|
|
'description': f"{count}{counttype}"
|
|
})
|
|
|
|
resp = create_rq.json()
|
|
|
|
if create_rq.status_code != 201:
|
|
return render_template("index_authorised.html", message={
|
|
'type': 'error',
|
|
'text': f"unexpected error when creating playlist {playlist_name}"
|
|
})
|
|
|
|
|
|
playlist_id = resp.get('id')
|
|
db.user_add_playlist(user_id, playlist_id, count, counttype)
|
|
|
|
PlaylistUpdater(db, auth_token).update_playlist(playlist_id)
|
|
|
|
|
|
return render_template("index_authorised.html", message={
|
|
'type': 'info',
|
|
'text': f"created playlist {playlist_name}. it will be populated shortly"
|
|
})
|
|
|
|
|
|
|
|
@app.route('/authcb')
|
|
def cb():
|
|
user_secret = request.cookies.get('user_secret')
|
|
if not user_secret:
|
|
return 'client secret is not existing or something: {user_secret=}'
|
|
|
|
error = request.args.get('error')
|
|
if error:
|
|
return 'error'
|
|
|
|
spotify_code = request.args.get('code')
|
|
authstring = base64.b64encode(bytes(f"{CLIENT_ID}:{CLIENT_SECRET}", 'utf-8')).decode('utf-8')
|
|
data = {
|
|
'code': spotify_code,
|
|
'grant_type': 'authorization_code',
|
|
'redirect_uri': BASE_URL+ url_for('cb'),
|
|
}
|
|
headers = {
|
|
'Authorization': f'Basic {authstring}',
|
|
'Content-Type': 'application/x-www-form-urlencoded'
|
|
}
|
|
|
|
token_rq = rq.post(f"{SPOTIFY_TOKEN_ENDPOINT}", data = data, headers = headers)
|
|
if token_rq.status_code != rq.codes.ok:
|
|
print(f"getting token failed {token_rq.text=}")
|
|
return render_template("index_authorised.html", message={
|
|
'type': 'error',
|
|
'text': f"unable to authenticate with spotify. please try again later."
|
|
})
|
|
|
|
resp = token_rq.json()
|
|
spotify_token = resp['access_token']
|
|
token_expires = resp['expires_in']
|
|
refresh_token = resp['refresh_token']
|
|
|
|
me_rq = rq.get(f"{SPOTIFY_API_ENDPOINT}/me", headers = { 'Authorization': f"Bearer {spotify_token}" })
|
|
if me_rq.status_code != 200:
|
|
print(f"getting user info failed {me_rq.text=}")
|
|
return render_template("index_authorised.html", message={
|
|
'type': 'error',
|
|
'text': f"unable to authenticate with spotify. please try again later."
|
|
})
|
|
resp = me_rq.json()
|
|
spotify_id = resp['id']
|
|
|
|
db.add_user(user_secret, spotify_token, spotify_id, token_expires, refresh_token, client_secret_expires=USER_SECRET_EXPIRATION)
|
|
|
|
resp = make_response(redirect(url_for('index', message=json.dumps({
|
|
'type': 'info',
|
|
'text': "successfully logged in with spotify"
|
|
}))))
|
|
resp.set_cookie('spotify_id', spotify_id)
|
|
return resp
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
resp = make_response(redirect(url_for('index')))
|
|
resp.set_cookie('user_secret', '')
|
|
return resp
|
|
|
|
@app.route('/manage')
|
|
def manage():
|
|
if not isauth(request.cookies):
|
|
return redirect(url_for('index'))
|
|
|
|
user_id = request.cookies.get('spotify_id')
|
|
playlist_id = request.args.get('playlist_id')
|
|
action = request.args.get('action')
|
|
|
|
def gen_populated_response(message = False):
|
|
session = FuturesSession()
|
|
futures = []
|
|
ids = db.get_user_playlists(user_id)
|
|
playlists = []
|
|
headers = { 'Authorization': f'Bearer {db.get_user_auth_token(user_id)}' }
|
|
for pl_id in ids:
|
|
futures.append(session.get(f"{SPOTIFY_API_ENDPOINT}/playlists/{pl_id}?fields=id,name", headers = headers))
|
|
|
|
for future in futures:
|
|
resp = future.result()
|
|
if resp.status_code != 200:
|
|
# fail silently for now...
|
|
# TODO maybe report this error l8r to user, log
|
|
print(f"NONZERO STATUS CODE: {resp.status_code=}")
|
|
print(f"{resp.content=}")
|
|
continue
|
|
resp = resp.json()
|
|
playlists.append(resp)
|
|
|
|
if not message:
|
|
message = create_empty_message()
|
|
if len(ids) == 0:
|
|
message['type'] = 'info'
|
|
message['text'] = 'no playlists managed by heartbeats'
|
|
message_str = request.args.get('message')
|
|
if message_str:
|
|
message = json.loads(message_str)
|
|
|
|
return render_template("manage.html", message=message, playlists=playlists)
|
|
|
|
if not playlist_id:
|
|
return gen_populated_response()
|
|
|
|
if action in [ 'delete', 'unlink' ]:
|
|
actioned = 'deleted' if action == 'delete' else 'unlinked'
|
|
if action == 'delete':
|
|
headers = { 'Authorization': f'Bearer {db.get_user_auth_token(user_id)}' }
|
|
resp = rq.delete(
|
|
f"{SPOTIFY_API_ENDPOINT}/playlists/{playlist_id}/followers",
|
|
headers = headers
|
|
)
|
|
print(resp.content)
|
|
if resp.status_code != 200:
|
|
return redirect(url_for('manage', message=json.dumps({
|
|
'type': 'error',
|
|
'text': f'unable to {action} playlist {playlist_name}'
|
|
})))
|
|
|
|
|
|
db.user_del_playlist(user_id, playlist_id)
|
|
playlist_name = request.args.get('playlist_name')
|
|
return redirect(url_for('manage', message=json.dumps({
|
|
'type': 'info',
|
|
'text': f'successfully {actioned} playlist {playlist_name}'
|
|
})))
|
|
|
|
return gen_populated_response(message={
|
|
'type': 'error',
|
|
'text': f'action not recognised: {action}'
|
|
})
|