246 lines
9.1 KiB
Python
246 lines
9.1 KiB
Python
import requests
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import pprint
|
|
from flask import Flask, request
|
|
from utilfuncs import Utils
|
|
from envvars import (
|
|
CLIENTSEC,
|
|
SECRET,
|
|
USER_ID,
|
|
SAVE_DIR,
|
|
BASE,
|
|
LOGLEVEL,
|
|
ALLFILES,
|
|
ARTIST,
|
|
)
|
|
|
|
# Instantiating libraries and other parameters
|
|
app = Flask(__name__)
|
|
pp = pprint.PrettyPrinter(indent=4)
|
|
app.json.compact = False
|
|
|
|
# This function is for production.
|
|
# @app.route("/webhook", methods=["POST"])
|
|
# def accept_webhook():
|
|
# logging.info("Webhook page prepared and ready to receive webhook.")
|
|
# if request.method == "POST":
|
|
# logging.info("POST call received")
|
|
# payload = json.loads(request.json)
|
|
# logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
|
|
# try:
|
|
# service_date, webhook_ids = parse_payload(payload)
|
|
# if service_date:
|
|
# service_id, plan_id = get_my_plans(webhook_ids)
|
|
# if service_id is None or plan_id is None:
|
|
# pass
|
|
# else:
|
|
# return get_plans(service_id, plan_id, service_date)
|
|
# except TypeError as e:
|
|
# print(e)
|
|
# finally:
|
|
# return "End of app"
|
|
|
|
|
|
# This function is for testing and doesn't compare the user's plan/service ids to the webhook.
|
|
# TODO: Use the parts of this function that skip over `get_my_plans` as part of a env variable to allow users to download music outside of their assigned service and plans.
|
|
@app.route("/webhook", methods=["POST"])
|
|
def accept_webhook():
|
|
logging.info("Webhook page prepared and ready to receive webhook.")
|
|
if request.method == "POST":
|
|
logging.info("POST call received")
|
|
payload = json.loads(request.json)
|
|
logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
|
|
try:
|
|
service_date, webhook_ids = parse_payload(payload)
|
|
if service_date:
|
|
service_id = webhook_ids[1]
|
|
plan_id = webhook_ids[0]
|
|
get_plans(service_id, plan_id, service_date)
|
|
return "Try loop completed"
|
|
except TypeError as e:
|
|
print(service_id, plan_id)
|
|
print("This is a type error")
|
|
print(e)
|
|
finally:
|
|
return "End of app"
|
|
|
|
|
|
def parse_payload(payload):
|
|
for item in payload["data"]:
|
|
# event_type lives at item['attributes']['name']
|
|
webhook_ids = (
|
|
item["attributes"]["payload"]["data"]["id"],
|
|
item["attributes"]["payload"]["data"]["relationships"]["service_type"][
|
|
"data"
|
|
]["id"],
|
|
)
|
|
# Webhook IDs tuple are: (plan id, service id)
|
|
service_date = item["attributes"]["payload"]["data"]["attributes"]["dates"]
|
|
|
|
# This is just for testing purposes as the date had multiple rehearsal tracks
|
|
if service_date == "June 15, 2025":
|
|
print(f"Special consideration for {service_date}")
|
|
else:
|
|
# Check if service date is in the future before proceeding
|
|
if not Utils.is_future_date(service_date):
|
|
logging.info(f"Service date {service_date} is in the past. Exiting.")
|
|
return f"Service date {service_date} is in the past. Exiting."
|
|
return service_date, webhook_ids
|
|
|
|
|
|
def get_my_plans(incoming_ids):
|
|
"""
|
|
This is the url for all the plans the user is part of
|
|
"""
|
|
url = f"{BASE}/services/v2/people/{USER_ID}/plan_people"
|
|
# TODO: Move all calls to the utils file
|
|
data = requests.get(url, auth=(CLIENTSEC, SECRET)).json()
|
|
plan_ids = []
|
|
for items in data["data"]:
|
|
plan_ids.append(
|
|
(
|
|
items["relationships"]["plan"]["data"]["id"],
|
|
items["relationships"]["service_type"]["data"]["id"],
|
|
)
|
|
)
|
|
# In Tuple, index 0 is the service_type_id and index 1 is the plan_id
|
|
# Service Type is a parent of the plan. Plans exist within services
|
|
if len(plan_ids) > 1:
|
|
for plan in plan_ids:
|
|
if plan == incoming_ids:
|
|
return incoming_ids[1], incoming_ids[0]
|
|
else:
|
|
return "Plan doesn't exist in PlanPerson payloads"
|
|
else:
|
|
if plan_ids[0] == incoming_ids:
|
|
return incoming_ids[1], incoming_ids[0]
|
|
else:
|
|
return "Plan doesn't exist in PlanPerson payloads"
|
|
|
|
|
|
def get_plans(service_type_id, plan_id, service_date):
|
|
planurl = (
|
|
f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items"
|
|
)
|
|
reqdata = requests.get(planurl, auth=(CLIENTSEC, SECRET)).json()
|
|
logging.debug(reqdata)
|
|
get_file_key(reqdata, service_type_id, plan_id, planurl, service_date)
|
|
|
|
|
|
def get_file_key(reqdata, service_type_id, plan_id, planurl, service_date):
|
|
filenames = []
|
|
for items in reqdata["data"]:
|
|
if items["attributes"]["item_type"] == "song":
|
|
item_id = items["id"]
|
|
getkey = requests.get(
|
|
f"{planurl}/{item_id}/key", auth=(CLIENTSEC, SECRET)
|
|
).json()
|
|
key = getkey["data"]["attributes"]["starting_key"]
|
|
filename = f"{items['attributes']['title']} - {key}"
|
|
filenames.append(filename)
|
|
|
|
print(f"Here are the filenames: {filenames}")
|
|
get_attachments(filenames, service_type_id, plan_id, service_date)
|
|
|
|
|
|
def get_attachments(filenames, service_type_id, plan_id, service_date):
|
|
# I've added the payload to logging so please check this payload to see what your church will/won't allow.
|
|
planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/all_attachments"
|
|
plana = requests.get(planattachments, auth=(CLIENTSEC, SECRET)).json()
|
|
pp.pprint(plana)
|
|
logging.info("Looking for attachments")
|
|
non_downloadables = []
|
|
if plana["data"] == []:
|
|
logging.info(f"No attachments available for {service_date}")
|
|
else:
|
|
logging.info(f"Attachments available for {service_date}")
|
|
for attachments in plana["data"]:
|
|
if attachments["attributes"]["allow_mp3_download"]:
|
|
if (
|
|
attachments["attributes"]["content_type"] is not None
|
|
and "audio" in attachments["attributes"]["content_type"]
|
|
):
|
|
attachment_id = attachments["id"]
|
|
file = attachments["attributes"]["filename"]
|
|
if ALLFILES == "False":
|
|
print(
|
|
f"Grabbing only track name matches as ALLFILES is {ALLFILES}"
|
|
)
|
|
print(file)
|
|
print(filenames)
|
|
close_match = Utils.find_close_matches(file, filenames)
|
|
# TODO: Figure out how to run the close matches with non-obvious file/content names
|
|
get_download_url(attachment_id, close_match, ALLFILES)
|
|
elif ALLFILES == "True":
|
|
print(f"Grabbing all files as ALLFILES is {ALLFILES}")
|
|
get_download_url(attachment_id, file, ALLFILES)
|
|
else:
|
|
continue
|
|
else:
|
|
non_downloadables.append(attachments["attributes"]["filename"])
|
|
continue
|
|
if len(non_downloadables) > 0:
|
|
logging.info(
|
|
f"The following audio content-types files do not allow for downloading: {non_downloadables}"
|
|
)
|
|
return "Returning this statement."
|
|
|
|
|
|
def get_download_url(uuid, file, allfiles):
|
|
"""
|
|
Has not been fully tested yet.
|
|
To get the s3 bucket download url, this is a sample curl url:
|
|
curl -u client_sec:secret_secret -X POST https://api.planningcenteronline.com/services/v2/media/media_id/attachments/attachment_id/open
|
|
"""
|
|
downurl = f"{BASE}/services/v2/attachments/{uuid}/open"
|
|
downreq = requests.post(downurl, auth=(CLIENTSEC, SECRET))
|
|
downdata = downreq.json()
|
|
s3url = downdata["data"]["attributes"]["attachment_url"]
|
|
|
|
s3response = requests.get(s3url)
|
|
print(s3response.status_code)
|
|
if s3response.status_code == 200:
|
|
with open(f"{str(file)}", "wb") as f:
|
|
f.write(s3response.content)
|
|
# if allfiles == "False":
|
|
# # with open(f"{str(file)[2:-2]}.mp3", "wb") as f:
|
|
# with open(f"{str(file)}", "wb") as f:
|
|
# f.write(s3response.content)
|
|
# else:
|
|
# with open(f"{str(file)}", "wb") as f:
|
|
# f.write(s3response.content)
|
|
|
|
edit_metadata(file)
|
|
return "Successfully downloaded"
|
|
else:
|
|
return "Something went wrong with the download"
|
|
|
|
|
|
def edit_metadata(file):
|
|
print(file)
|
|
print("******")
|
|
command = [
|
|
"id3v2",
|
|
"-g",
|
|
"Christian",
|
|
# TODO: Add environmental variable for people to change their own genre.
|
|
"-A",
|
|
"PCO",
|
|
"-a",
|
|
ARTIST,
|
|
"-t",
|
|
file,
|
|
file,
|
|
]
|
|
subprocess.call(command)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import logging
|
|
|
|
numeric_level = getattr(logging, LOGLEVEL.upper(), None)
|
|
logging.basicConfig(filename="record.log", level=numeric_level)
|
|
app.run(host="0.0.0.0", port=8000, debug=True)
|