Files

246 lines
9.1 KiB
Python
Raw Permalink Normal View History

import requests
import os
import subprocess
import json
import pprint
from flask import Flask, request
from utilfuncs import Utils
from envvars import (
CLIENTSEC,
SECRET,
USER_ID,
SAVE_DIR,
BASE,
LOGLEVEL,
ALLFILES,
ARTIST,
)
# Instantiating libraries and other parameters
app = Flask(__name__)
pp = pprint.PrettyPrinter(indent=4)
app.json.compact = False
# This function is for production.
# @app.route("/webhook", methods=["POST"])
# def accept_webhook():
# logging.info("Webhook page prepared and ready to receive webhook.")
# if request.method == "POST":
# logging.info("POST call received")
# payload = json.loads(request.json)
# logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
# try:
# service_date, webhook_ids = parse_payload(payload)
# if service_date:
# service_id, plan_id = get_my_plans(webhook_ids)
# if service_id is None or plan_id is None:
# pass
# else:
# return get_plans(service_id, plan_id, service_date)
# except TypeError as e:
# print(e)
# finally:
# return "End of app"
# This function is for testing and doesn't compare the user's plan/service ids to the webhook.
# TODO: Use the parts of this function that skip over `get_my_plans` as part of a env variable to allow users to download music outside of their assigned service and plans.
@app.route("/webhook", methods=["POST"])
def accept_webhook():
logging.info("Webhook page prepared and ready to receive webhook.")
if request.method == "POST":
logging.info("POST call received")
payload = json.loads(request.json)
logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
try:
service_date, webhook_ids = parse_payload(payload)
if service_date:
service_id = webhook_ids[1]
plan_id = webhook_ids[0]
get_plans(service_id, plan_id, service_date)
return "Try loop completed"
except TypeError as e:
print(service_id, plan_id)
print("This is a type error")
print(e)
finally:
return "End of app"
def parse_payload(payload):
for item in payload["data"]:
# event_type lives at item['attributes']['name']
webhook_ids = (
item["attributes"]["payload"]["data"]["id"],
item["attributes"]["payload"]["data"]["relationships"]["service_type"][
"data"
]["id"],
)
# Webhook IDs tuple are: (plan id, service id)
service_date = item["attributes"]["payload"]["data"]["attributes"]["dates"]
# This is just for testing purposes as the date had multiple rehearsal tracks
if service_date == "June 15, 2025":
print(f"Special consideration for {service_date}")
else:
# Check if service date is in the future before proceeding
if not Utils.is_future_date(service_date):
logging.info(f"Service date {service_date} is in the past. Exiting.")
return f"Service date {service_date} is in the past. Exiting."
return service_date, webhook_ids
def get_my_plans(incoming_ids):
"""
This is the url for all the plans the user is part of
"""
url = f"{BASE}/services/v2/people/{USER_ID}/plan_people"
# TODO: Move all calls to the utils file
data = requests.get(url, auth=(CLIENTSEC, SECRET)).json()
plan_ids = []
for items in data["data"]:
plan_ids.append(
(
items["relationships"]["plan"]["data"]["id"],
items["relationships"]["service_type"]["data"]["id"],
)
)
# In Tuple, index 0 is the service_type_id and index 1 is the plan_id
# Service Type is a parent of the plan. Plans exist within services
if len(plan_ids) > 1:
for plan in plan_ids:
if plan == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return "Plan doesn't exist in PlanPerson payloads"
else:
if plan_ids[0] == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return "Plan doesn't exist in PlanPerson payloads"
def get_plans(service_type_id, plan_id, service_date):
planurl = (
f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items"
)
reqdata = requests.get(planurl, auth=(CLIENTSEC, SECRET)).json()
logging.debug(reqdata)
get_file_key(reqdata, service_type_id, plan_id, planurl, service_date)
def get_file_key(reqdata, service_type_id, plan_id, planurl, service_date):
filenames = []
for items in reqdata["data"]:
if items["attributes"]["item_type"] == "song":
item_id = items["id"]
getkey = requests.get(
f"{planurl}/{item_id}/key", auth=(CLIENTSEC, SECRET)
).json()
key = getkey["data"]["attributes"]["starting_key"]
filename = f"{items['attributes']['title']} - {key}"
filenames.append(filename)
print(f"Here are the filenames: {filenames}")
get_attachments(filenames, service_type_id, plan_id, service_date)
def get_attachments(filenames, service_type_id, plan_id, service_date):
# I've added the payload to logging so please check this payload to see what your church will/won't allow.
planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/all_attachments"
plana = requests.get(planattachments, auth=(CLIENTSEC, SECRET)).json()
pp.pprint(plana)
logging.info("Looking for attachments")
non_downloadables = []
if plana["data"] == []:
logging.info(f"No attachments available for {service_date}")
else:
logging.info(f"Attachments available for {service_date}")
for attachments in plana["data"]:
if attachments["attributes"]["allow_mp3_download"]:
if (
attachments["attributes"]["content_type"] is not None
and "audio" in attachments["attributes"]["content_type"]
):
attachment_id = attachments["id"]
file = attachments["attributes"]["filename"]
if ALLFILES == "False":
print(
f"Grabbing only track name matches as ALLFILES is {ALLFILES}"
)
print(file)
print(filenames)
close_match = Utils.find_close_matches(file, filenames)
# TODO: Figure out how to run the close matches with non-obvious file/content names
get_download_url(attachment_id, close_match, ALLFILES)
elif ALLFILES == "True":
print(f"Grabbing all files as ALLFILES is {ALLFILES}")
get_download_url(attachment_id, file, ALLFILES)
else:
continue
else:
non_downloadables.append(attachments["attributes"]["filename"])
continue
if len(non_downloadables) > 0:
logging.info(
f"The following audio content-types files do not allow for downloading: {non_downloadables}"
)
return "Returning this statement."
def get_download_url(uuid, file, allfiles):
"""
Has not been fully tested yet.
To get the s3 bucket download url, this is a sample curl url:
curl -u client_sec:secret_secret -X POST https://api.planningcenteronline.com/services/v2/media/media_id/attachments/attachment_id/open
"""
downurl = f"{BASE}/services/v2/attachments/{uuid}/open"
downreq = requests.post(downurl, auth=(CLIENTSEC, SECRET))
downdata = downreq.json()
s3url = downdata["data"]["attributes"]["attachment_url"]
s3response = requests.get(s3url)
print(s3response.status_code)
if s3response.status_code == 200:
with open(f"{str(file)}", "wb") as f:
f.write(s3response.content)
# if allfiles == "False":
# # with open(f"{str(file)[2:-2]}.mp3", "wb") as f:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
# else:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
edit_metadata(file)
return "Successfully downloaded"
else:
return "Something went wrong with the download"
def edit_metadata(file):
print(file)
print("******")
command = [
"id3v2",
"-g",
"Christian",
# TODO: Add environmental variable for people to change their own genre.
"-A",
"PCO",
"-a",
ARTIST,
"-t",
file,
file,
]
subprocess.call(command)
if __name__ == "__main__":
import logging
numeric_level = getattr(logging, LOGLEVEL.upper(), None)
logging.basicConfig(filename="record.log", level=numeric_level)
app.run(host="0.0.0.0", port=8000, debug=True)