First commit for this small Flask app. Logic added for following states: my service type future with attachments, my service type future no attachments, my service type past, not my service. Need to test with real attachments and add functions for adding the Id3 tags. Will also need to bundle this in a docker container for ease of deployment.

This commit is contained in:
Norm Rasmussen
2025-06-11 16:46:35 -04:00
commit b8c9bb0ad1
8 changed files with 255 additions and 0 deletions

14
.env.example Normal file
View File

@ -0,0 +1,14 @@
# Please go to https://api.planningcenteronline.com/oauth/applications
# Create new Personal Access Token
# Paste them below
CLIENT = ""
SECRET = ""
# USER_ID can be found via API or after logging into PCO, clicking your initials in top right of screen, and clicking "My Profile".
# The URL will be: https://services.planningcenteronline.com/people/XX{user_id}/edit
# Just grab the numbers, not the two letters before it.
USER_ID = ""
# DIR is the final resting place of your music files from PCO. Should be accessible on the same device/VM as this flask app is running.
# TODO: Add ssh possibility
DIR = ""
# Options include DEBUG, INFO, ERROR
LOGLEVEL = "INFO"

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.*
record.log
!.gitignore
!.env.example

Binary file not shown.

Binary file not shown.

141
main.py Normal file
View File

@ -0,0 +1,141 @@
import requests
import json
import pprint
from flask import Flask, request
from dotenv import load_dotenv
import os
from utilfuncs import Utils
load_dotenv(override=True)
app = Flask(__name__)
pp = pprint.PrettyPrinter(indent=4)
CLIENTSEC = os.getenv("CLIENT")
SECRET = os.getenv("SECRET")
USER_ID = os.getenv("USER_ID")
SAVE_DIR = os.getenv("DIR")
BASE = "https://api.planningcenteronline.com"
LOGLEVEL = os.getenv("LOGLEVEL")
@app.route("/webhook", methods=["POST"])
def accept_webhook():
logging.info("Webhook page prepared and ready to receive webhook.")
if request.method == "POST":
logging.info("POST call received")
payload = json.loads(request.json)
logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
try:
service_date, webhook_ids = parse_payload(payload)
if service_date:
service_id, plan_id = get_my_plans(webhook_ids)
if service_id is None or plan_id is None:
pass
else:
return get_plans(service_id, plan_id, service_date)
except TypeError as e:
print(e)
finally:
return "End of app"
def parse_payload(payload):
for item in payload["data"]:
# event_type = item['attributes']['name']
webhook_ids = (
item["attributes"]["payload"]["data"]["id"],
item["attributes"]["payload"]["data"]["relationships"]["service_type"][
"data"
]["id"],
)
service_date = item["attributes"]["payload"]["data"]["attributes"]["dates"]
# Check if service date is in the future before proceeding
if not Utils.is_future_date(service_date):
logging.info(f"Service date {service_date} is in the past. Exiting.")
return f"Service date {service_date} is in the past. Exiting."
return service_date, webhook_ids
def get_my_plans(incoming_ids):
"""
This is the url for all the plans the user is part of
"""
url = f"{BASE}/services/v2/people/{USER_ID}/plan_people"
data = requests.get(url, auth=(CLIENTSEC, SECRET)).json()
plan_ids = []
for items in data["data"]:
plan_ids.append(
(
items["relationships"]["plan"]["data"]["id"],
items["relationships"]["service_type"]["data"]["id"],
)
)
# In Tuple, index 0 is the service_type_id and index 1 is the plan_id
# Service Type is a parent of the plan. Plans exist within services
if len(plan_ids) > 1:
for plan in plan_ids:
if plan == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return None
else:
if plan_ids[0] == incoming_ids:
return incoming_ids[1], incoming_ids[0]
else:
return None
def get_plans(service_type_id, plan_id, service_date):
filenames = []
planurl = (
f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items"
)
reqdata = requests.get(planurl, auth=(CLIENTSEC, SECRET)).json()
for items in reqdata["data"]:
if items["attributes"]["item_type"] == "song":
item_id = items["id"]
getkey = requests.get(
f"{planurl}/{item_id}/key", auth=(CLIENTSEC, SECRET)
).json()
key = getkey["data"]["attributes"]["starting_key"]
filename = f"{items['attributes']['title']} - {key}"
filenames.append(filename)
planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/items/{item_id}/media"
planattachments = f"{BASE}/services/v2/service_types/{service_type_id}/plans/{plan_id}/all_attachments"
plana = requests.get(planattachments, auth=(CLIENTSEC, SECRET)).json()
logging.info("Looking for attachments")
if plana["data"] == []:
logging.info(f"No attachments available for {service_date}")
else:
logging.info(f"Attachments available for {service_date}")
for attachments in plana["data"]:
if attachments["attributes"]["content_type"] == "audio/mpeg":
# attachment_id = attachments["id"]
file = attachments["attributes"]["filename"]
# close_match = Utils.find_close_matches(file, filenames)
# I know this works outside of a flask app with a specific payload, but I haven't gotten this far to test functions across all possible webhook payloads
# return get_download_url(attachment_id, close_match)
return "Next function is the get_download_url"
def get_download_url(uuid, file):
"""
Has not been fully tested yet.
To get the s3 bucket download url, this is a sample curl url:
curl -u client_sec:secret_secret -X POST https://api.planningcenteronline.com/services/v2/media/media_id/attachments/attachment_id/open
"""
downurl = f"{BASE}/services/v2/attachments/{uuid}/open"
downreq = requests.post(downurl, auth=(CLIENTSEC, SECRET))
downdata = downreq.json()
s3url = downdata["data"]["attributes"]["attachment_url"]
s3response = requests.get(s3url)
with open(f"{str(file)[2:-2]}.mp3", "wb") as f:
f.write(s3response.content)
if __name__ == "__main__":
import logging
numeric_level = getattr(logging, LOGLEVEL.upper(), None)
logging.basicConfig(filename='record.log', level=numeric_level)
app.run(host="0.0.0.0", port=8000, debug=True)

18
requirements.txt Normal file
View File

@ -0,0 +1,18 @@
black==25.1.0
blinker==1.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.2.1
flask==3.1.1
idna==3.10
itsdangerous==2.2.0
jinja2==3.1.6
markupsafe==3.0.2
mypy-extensions==1.1.0
packaging==25.0
pathspec==0.12.1
platformdirs==4.3.8
python-dotenv==1.1.0
requests==2.32.3
urllib3==2.4.0
werkzeug==3.1.3

13
send_webhook.py Normal file
View File

@ -0,0 +1,13 @@
import json
import requests
from utilfuncs import Utils
import sys
URL = "http://127.0.0.1:8000/webhook"
HEADERS = {"content-type": "application/json"}
file = sys.argv[1]
my_campus_payload = Utils.load_json_from_file(file)
response = requests.post(URL, headers=HEADERS, json=json.dumps(my_campus_payload))
print(response.status_code)
print(response.text)

65
utilfuncs.py Normal file
View File

@ -0,0 +1,65 @@
import time
import json
import datetime
import difflib
class Utils:
def is_future_date(service_date):
"""
Converts service_date in format "Mon Day, Year" to epoch time
and compares it to today's date.
Args:
service_date (str): Date string in format "Mon Day, Year" (e.g., "Jun 15, 2025")
Returns:
bool: True if service_date is in the future, False otherwise
"""
try:
date_obj = datetime.datetime.strptime(service_date, "%B %d, %Y")
service_epoch = int(date_obj.timestamp())
current_epoch = int(time.time())
return service_epoch > current_epoch
except ValueError as e:
print(f"Error parsing date: {e}")
return True
def find_close_matches(word, possibilities, cutoff=0.4):
"""
Finds close matches for a word from a list of possibilities.
Args:
word (str): The word to find close matches for.
possibilities (list): A list of possible matches.
cutoff (float, optional): The minimum similarity ratio for a word to be considered a close match. Defaults to 0.6.
Returns:
list: A list of close matches.
"""
return difflib.get_close_matches(word, possibilities, cutoff=cutoff)
def load_json_from_file(file_path):
"""
Reads and loads JSON data from a file.
Args:
file_path (str): Path to the JSON file
Returns:
dict/list: Parsed JSON data
Raises:
FileNotFoundError: If the specified file doesn't exist
json.JSONDecodeError: If the file contains invalid JSON
"""
try:
with open(file_path, 'r') as file:
data = json.load(file)
return data
except FileNotFoundError:
print(f"Error: File '{file_path}' not found")
raise
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON format in file '{file_path}': {str(e)}")
raise
except Exception as e:
print(f"Error: An unexpected error occurred: {str(e)}")
raise