Files
planning-center-to-local-pl…/main.py

246 lines
9.1 KiB
Python

import requests
import os
import subprocess
import json
import pprint
from flask import Flask, request
from utilfuncs import Utils
from envvars import (
CLIENTSEC,
SECRET,
USER_ID,
SAVE_DIR,
BASE,
LOGLEVEL,
ALLFILES,
ARTIST,
)
# Instantiating libraries and other parameters
app = Flask(__name__)
pp = pprint.PrettyPrinter(indent=4)
app.json.compact = False
# This function is for production.
# @app.route("/webhook", methods=["POST"])
# def accept_webhook():
# logging.info("Webhook page prepared and ready to receive webhook.")
# if request.method == "POST":
# logging.info("POST call received")
# payload = json.loads(request.json)
# logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
# try:
# service_date, webhook_ids = parse_payload(payload)
# if service_date:
# service_id, plan_id = get_my_plans(webhook_ids)
# if service_id is None or plan_id is None:
# pass
# else:
# return get_plans(service_id, plan_id, service_date)
# except TypeError as e:
# print(e)
# finally:
# return "End of app"
# This function is for testing and doesn't compare the user's plan/service ids to the webhook.
# TODO: Use the parts of this function that skip over `get_my_plans` as part of a env variable to allow users to download music outside of their assigned service and plans.
@app.route("/webhook", methods=["POST"])
def accept_webhook():
logging.info("Webhook page prepared and ready to receive webhook.")
if request.method == "POST":
logging.info("POST call received")
payload = json.loads(request.json)
logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
try:
service_date, webhook_ids = parse_payload(payload)
if service_date:
service_id = webhook_ids[1]
plan_id = webhook_ids[0]
get_plans(service_id, plan_id, service_date)
return "Try loop completed"
except TypeError as e:
print(service_id, plan_id)
print("This is a type error")
print(e)
finally:
return "End of app"
def parse_payload(payload):
for item in payload["data"]:
# event_type lives at item['attributes']['name']
webhook_ids = (
item["attributes"]["payload"]["data"]["id"],
item["attributes"]["payload"]["data"]["relationships"]["service_type"][
"data"
]["id"],
)
# Webhook IDs tuple are: (plan id, service id)
service_date = item["attributes"]["payload"]["data"]["attributes"]["dates"]
# This is just for testing purposes as the date had multiple rehearsal tracks
if service_date == "June 15, 2025":
print(f"Special consideration for {service_date}")
else:
# Check if service date is in the future before proceeding
if not Utils.is_future_date(service_date):
logging.info(f"Service date {service_date} is in the past. Exiting.")
return f"Service date {service_date} is in the past. Exiting."
return service_date, webhook_ids
def get_my_plans(incoming_ids):
"""
This is the url for all the plans the user is part of
"""
url = f"{BASE}/services/v2/people/{USER_ID}/plan_people"
# TODO: Move all calls to the utils file
data = requests.get(url, auth=(CLIENTSEC, SECRET)).json()
plan_ids = []
for items in data["data"]:
plan_ids.append(
(
items["relationships"]["plan"]["data"]["id"],
items["relationships"]["service_type"]["data"]["id"],
)
)
# In Tuple, index 0 is the service_type_id and index 1 is the plan_id
# Service Type is a parent of the plan. Plans exist within services
if len(plan_ids) > 1:
for plan in plan_ids:
if plan == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return "Plan doesn't exist in PlanPerson payloads"
else:
if plan_ids[0] == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return "Plan doesn't exist in PlanPerson payloads"
def get_plans(service_type_id, plan_id, service_date):
planurl = (
f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items"
)
reqdata = requests.get(planurl, auth=(CLIENTSEC, SECRET)).json()
logging.debug(reqdata)
get_file_key(reqdata, service_type_id, plan_id, planurl, service_date)
def get_file_key(reqdata, service_type_id, plan_id, planurl, service_date):
filenames = []
for items in reqdata["data"]:
if items["attributes"]["item_type"] == "song":
item_id = items["id"]
getkey = requests.get(
f"{planurl}/{item_id}/key", auth=(CLIENTSEC, SECRET)
).json()
key = getkey["data"]["attributes"]["starting_key"]
filename = f"{items['attributes']['title']} - {key}"
filenames.append(filename)
print(f"Here are the filenames: {filenames}")
get_attachments(filenames, service_type_id, plan_id, service_date)
def get_attachments(filenames, service_type_id, plan_id, service_date):
# I've added the payload to logging so please check this payload to see what your church will/won't allow.
planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/all_attachments"
plana = requests.get(planattachments, auth=(CLIENTSEC, SECRET)).json()
pp.pprint(plana)
logging.info("Looking for attachments")
non_downloadables = []
if plana["data"] == []:
logging.info(f"No attachments available for {service_date}")
else:
logging.info(f"Attachments available for {service_date}")
for attachments in plana["data"]:
if attachments["attributes"]["allow_mp3_download"]:
if (
attachments["attributes"]["content_type"] is not None
and "audio" in attachments["attributes"]["content_type"]
):
attachment_id = attachments["id"]
file = attachments["attributes"]["filename"]
if ALLFILES == "False":
print(
f"Grabbing only track name matches as ALLFILES is {ALLFILES}"
)
print(file)
print(filenames)
close_match = Utils.find_close_matches(file, filenames)
# TODO: Figure out how to run the close matches with non-obvious file/content names
get_download_url(attachment_id, close_match, ALLFILES)
elif ALLFILES == "True":
print(f"Grabbing all files as ALLFILES is {ALLFILES}")
get_download_url(attachment_id, file, ALLFILES)
else:
continue
else:
non_downloadables.append(attachments["attributes"]["filename"])
continue
if len(non_downloadables) > 0:
logging.info(
f"The following audio content-types files do not allow for downloading: {non_downloadables}"
)
return "Returning this statement."
def get_download_url(uuid, file, allfiles):
"""
Has not been fully tested yet.
To get the s3 bucket download url, this is a sample curl url:
curl -u client_sec:secret_secret -X POST https://api.planningcenteronline.com/services/v2/media/media_id/attachments/attachment_id/open
"""
downurl = f"{BASE}/services/v2/attachments/{uuid}/open"
downreq = requests.post(downurl, auth=(CLIENTSEC, SECRET))
downdata = downreq.json()
s3url = downdata["data"]["attributes"]["attachment_url"]
s3response = requests.get(s3url)
print(s3response.status_code)
if s3response.status_code == 200:
with open(f"{str(file)}", "wb") as f:
f.write(s3response.content)
# if allfiles == "False":
# # with open(f"{str(file)[2:-2]}.mp3", "wb") as f:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
# else:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
edit_metadata(file)
return "Successfully downloaded"
else:
return "Something went wrong with the download"
def edit_metadata(file):
print(file)
print("******")
command = [
"id3v2",
"-g",
"Christian",
# TODO: Add environmental variable for people to change their own genre.
"-A",
"PCO",
"-a",
ARTIST,
"-t",
file,
file,
]
subprocess.call(command)
if __name__ == "__main__":
import logging
numeric_level = getattr(logging, LOGLEVEL.upper(), None)
logging.basicConfig(filename="record.log", level=numeric_level)
app.run(host="0.0.0.0", port=8000, debug=True)