import requests import os import subprocess import json import pprint from flask import Flask, request from utilfuncs import Utils from envvars import ( CLIENTSEC, SECRET, USER_ID, SAVE_DIR, BASE, LOGLEVEL, ALLFILES, ARTIST, ) # Instantiating libraries and other parameters app = Flask(__name__) pp = pprint.PrettyPrinter(indent=4) app.json.compact = False # This function is for production. # @app.route("/webhook", methods=["POST"]) # def accept_webhook(): # logging.info("Webhook page prepared and ready to receive webhook.") # if request.method == "POST": # logging.info("POST call received") # payload = json.loads(request.json) # logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************") # try: # service_date, webhook_ids = parse_payload(payload) # if service_date: # service_id, plan_id = get_my_plans(webhook_ids) # if service_id is None or plan_id is None: # pass # else: # return get_plans(service_id, plan_id, service_date) # except TypeError as e: # print(e) # finally: # return "End of app" # This function is for testing and doesn't compare the user's plan/service ids to the webhook. # TODO: Use the parts of this function that skip over `get_my_plans` as part of a env variable to allow users to download music outside of their assigned service and plans. @app.route("/webhook", methods=["POST"]) def accept_webhook(): logging.info("Webhook page prepared and ready to receive webhook.") if request.method == "POST": logging.info("POST call received") payload = json.loads(request.json) logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************") try: service_date, webhook_ids = parse_payload(payload) if service_date: service_id = webhook_ids[1] plan_id = webhook_ids[0] get_plans(service_id, plan_id, service_date) return "Try loop completed" except TypeError as e: print(service_id, plan_id) print("This is a type error") print(e) finally: return "End of app" def parse_payload(payload): for item in payload["data"]: # event_type lives at item['attributes']['name'] webhook_ids = ( item["attributes"]["payload"]["data"]["id"], item["attributes"]["payload"]["data"]["relationships"]["service_type"][ "data" ]["id"], ) # Webhook IDs tuple are: (plan id, service id) service_date = item["attributes"]["payload"]["data"]["attributes"]["dates"] # This is just for testing purposes as the date had multiple rehearsal tracks if service_date == "June 15, 2025": print(f"Special consideration for {service_date}") else: # Check if service date is in the future before proceeding if not Utils.is_future_date(service_date): logging.info(f"Service date {service_date} is in the past. Exiting.") return f"Service date {service_date} is in the past. Exiting." return service_date, webhook_ids def get_my_plans(incoming_ids): """ This is the url for all the plans the user is part of """ url = f"{BASE}/services/v2/people/{USER_ID}/plan_people" # TODO: Move all calls to the utils file data = requests.get(url, auth=(CLIENTSEC, SECRET)).json() plan_ids = [] for items in data["data"]: plan_ids.append( ( items["relationships"]["plan"]["data"]["id"], items["relationships"]["service_type"]["data"]["id"], ) ) # In Tuple, index 0 is the service_type_id and index 1 is the plan_id # Service Type is a parent of the plan. Plans exist within services if len(plan_ids) > 1: for plan in plan_ids: if plan == incoming_ids: return incoming_ids[1], incoming_ids[0] else: return "Plan doesn't exist in PlanPerson payloads" else: if plan_ids[0] == incoming_ids: return incoming_ids[1], incoming_ids[0] else: return "Plan doesn't exist in PlanPerson payloads" def get_plans(service_type_id, plan_id, service_date): planurl = ( f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items" ) reqdata = requests.get(planurl, auth=(CLIENTSEC, SECRET)).json() logging.debug(reqdata) get_file_key(reqdata, service_type_id, plan_id, planurl, service_date) def get_file_key(reqdata, service_type_id, plan_id, planurl, service_date): filenames = [] for items in reqdata["data"]: if items["attributes"]["item_type"] == "song": item_id = items["id"] getkey = requests.get( f"{planurl}/{item_id}/key", auth=(CLIENTSEC, SECRET) ).json() key = getkey["data"]["attributes"]["starting_key"] filename = f"{items['attributes']['title']} - {key}" filenames.append(filename) print(f"Here are the filenames: {filenames}") get_attachments(filenames, service_type_id, plan_id, service_date) def get_attachments(filenames, service_type_id, plan_id, service_date): # I've added the payload to logging so please check this payload to see what your church will/won't allow. planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/all_attachments" plana = requests.get(planattachments, auth=(CLIENTSEC, SECRET)).json() pp.pprint(plana) logging.info("Looking for attachments") non_downloadables = [] if plana["data"] == []: logging.info(f"No attachments available for {service_date}") else: logging.info(f"Attachments available for {service_date}") for attachments in plana["data"]: if attachments["attributes"]["allow_mp3_download"]: if ( attachments["attributes"]["content_type"] is not None and "audio" in attachments["attributes"]["content_type"] ): attachment_id = attachments["id"] file = attachments["attributes"]["filename"] if ALLFILES == "False": print( f"Grabbing only track name matches as ALLFILES is {ALLFILES}" ) print(file) print(filenames) close_match = Utils.find_close_matches(file, filenames) # TODO: Figure out how to run the close matches with non-obvious file/content names get_download_url(attachment_id, close_match, ALLFILES) elif ALLFILES == "True": print(f"Grabbing all files as ALLFILES is {ALLFILES}") get_download_url(attachment_id, file, ALLFILES) else: continue else: non_downloadables.append(attachments["attributes"]["filename"]) continue if len(non_downloadables) > 0: logging.info( f"The following audio content-types files do not allow for downloading: {non_downloadables}" ) return "Returning this statement." def get_download_url(uuid, file, allfiles): """ Has not been fully tested yet. To get the s3 bucket download url, this is a sample curl url: curl -u client_sec:secret_secret -X POST https://api.planningcenteronline.com/services/v2/media/media_id/attachments/attachment_id/open """ downurl = f"{BASE}/services/v2/attachments/{uuid}/open" downreq = requests.post(downurl, auth=(CLIENTSEC, SECRET)) downdata = downreq.json() s3url = downdata["data"]["attributes"]["attachment_url"] s3response = requests.get(s3url) print(s3response.status_code) if s3response.status_code == 200: with open(f"{str(file)}", "wb") as f: f.write(s3response.content) # if allfiles == "False": # # with open(f"{str(file)[2:-2]}.mp3", "wb") as f: # with open(f"{str(file)}", "wb") as f: # f.write(s3response.content) # else: # with open(f"{str(file)}", "wb") as f: # f.write(s3response.content) edit_metadata(file) return "Successfully downloaded" else: return "Something went wrong with the download" def edit_metadata(file): print(file) print("******") command = [ "id3v2", "-g", "Christian", # TODO: Add environmental variable for people to change their own genre. "-A", "PCO", "-a", ARTIST, "-t", file, file, ] subprocess.call(command) if __name__ == "__main__": import logging numeric_level = getattr(logging, LOGLEVEL.upper(), None) logging.basicConfig(filename="record.log", level=numeric_level) app.run(host="0.0.0.0", port=8000, debug=True)