2025-06-11 16:46:35 -04:00
import requests
2025-06-18 21:17:32 -04:00
import os
import subprocess
2025-06-11 16:46:35 -04:00
import json
import pprint
from flask import Flask , request
from utilfuncs import Utils
2025-06-18 21:17:32 -04:00
from envvars import (
CLIENTSEC ,
SECRET ,
USER_ID ,
SAVE_DIR ,
BASE ,
LOGLEVEL ,
ALLFILES ,
ARTIST ,
)
2025-06-11 16:46:35 -04:00
2025-06-18 21:17:32 -04:00
# Instantiating libraries and other parameters
2025-06-11 16:46:35 -04:00
app = Flask ( __name__ )
pp = pprint . PrettyPrinter ( indent = 4 )
2025-06-18 21:17:32 -04:00
app . json . compact = False
# This function is for production.
# @app.route("/webhook", methods=["POST"])
# def accept_webhook():
# logging.info("Webhook page prepared and ready to receive webhook.")
# if request.method == "POST":
# logging.info("POST call received")
# payload = json.loads(request.json)
# logging.debug(f"***** DEBUG PAYLOAD *****\n{payload}\n************")
# try:
# service_date, webhook_ids = parse_payload(payload)
# if service_date:
# service_id, plan_id = get_my_plans(webhook_ids)
# if service_id is None or plan_id is None:
# pass
# else:
# return get_plans(service_id, plan_id, service_date)
# except TypeError as e:
# print(e)
# finally:
# return "End of app"
2025-06-11 16:46:35 -04:00
2025-06-18 21:17:32 -04:00
# This function is for testing and doesn't compare the user's plan/service ids to the webhook.
# TODO: Use the parts of this function that skip over `get_my_plans` as part of a env variable to allow users to download music outside of their assigned service and plans.
2025-06-11 16:46:35 -04:00
@app.route ( " /webhook " , methods = [ " POST " ] )
def accept_webhook ( ) :
logging . info ( " Webhook page prepared and ready to receive webhook. " )
if request . method == " POST " :
logging . info ( " POST call received " )
payload = json . loads ( request . json )
logging . debug ( f " ***** DEBUG PAYLOAD ***** \n { payload } \n ************ " )
try :
service_date , webhook_ids = parse_payload ( payload )
if service_date :
2025-06-18 21:17:32 -04:00
service_id = webhook_ids [ 1 ]
plan_id = webhook_ids [ 0 ]
get_plans ( service_id , plan_id , service_date )
return " Try loop completed "
2025-06-11 16:46:35 -04:00
except TypeError as e :
2025-06-18 21:17:32 -04:00
print ( service_id , plan_id )
print ( " This is a type error " )
2025-06-11 16:46:35 -04:00
print ( e )
finally :
return " End of app "
def parse_payload ( payload ) :
for item in payload [ " data " ] :
2025-06-18 21:17:32 -04:00
# event_type lives at item['attributes']['name']
2025-06-11 16:46:35 -04:00
webhook_ids = (
item [ " attributes " ] [ " payload " ] [ " data " ] [ " id " ] ,
item [ " attributes " ] [ " payload " ] [ " data " ] [ " relationships " ] [ " service_type " ] [
" data "
] [ " id " ] ,
)
2025-06-18 21:17:32 -04:00
# Webhook IDs tuple are: (plan id, service id)
2025-06-11 16:46:35 -04:00
service_date = item [ " attributes " ] [ " payload " ] [ " data " ] [ " attributes " ] [ " dates " ]
2025-06-18 21:17:32 -04:00
# This is just for testing purposes as the date had multiple rehearsal tracks
if service_date == " June 15, 2025 " :
print ( f " Special consideration for { service_date } " )
else :
# Check if service date is in the future before proceeding
if not Utils . is_future_date ( service_date ) :
logging . info ( f " Service date { service_date } is in the past. Exiting. " )
return f " Service date { service_date } is in the past. Exiting. "
2025-06-11 16:46:35 -04:00
return service_date , webhook_ids
2025-06-18 21:17:32 -04:00
2025-06-11 16:46:35 -04:00
def get_my_plans ( incoming_ids ) :
"""
This is the url for all the plans the user is part of
"""
url = f " { BASE } /services/v2/people/ { USER_ID } /plan_people "
2025-06-18 21:17:32 -04:00
# TODO: Move all calls to the utils file
2025-06-11 16:46:35 -04:00
data = requests . get ( url , auth = ( CLIENTSEC , SECRET ) ) . json ( )
plan_ids = [ ]
for items in data [ " data " ] :
plan_ids . append (
(
items [ " relationships " ] [ " plan " ] [ " data " ] [ " id " ] ,
items [ " relationships " ] [ " service_type " ] [ " data " ] [ " id " ] ,
)
)
# In Tuple, index 0 is the service_type_id and index 1 is the plan_id
# Service Type is a parent of the plan. Plans exist within services
if len ( plan_ids ) > 1 :
for plan in plan_ids :
if plan == incoming_ids :
return incoming_ids [ 1 ] , incoming_ids [ 0 ]
else :
2025-06-18 21:17:32 -04:00
return " Plan doesn ' t exist in PlanPerson payloads "
2025-06-11 16:46:35 -04:00
else :
if plan_ids [ 0 ] == incoming_ids :
return incoming_ids [ 1 ] , incoming_ids [ 0 ]
else :
2025-06-18 21:17:32 -04:00
return " Plan doesn ' t exist in PlanPerson payloads "
2025-06-11 16:46:35 -04:00
def get_plans ( service_type_id , plan_id , service_date ) :
planurl = (
f " { BASE } /services/v2/service_types/ { service_type_id } /plans/ { plan_id } /items "
)
reqdata = requests . get ( planurl , auth = ( CLIENTSEC , SECRET ) ) . json ( )
2025-06-18 21:17:32 -04:00
logging . debug ( reqdata )
get_file_key ( reqdata , service_type_id , plan_id , planurl , service_date )
def get_file_key ( reqdata , service_type_id , plan_id , planurl , service_date ) :
filenames = [ ]
2025-06-11 16:46:35 -04:00
for items in reqdata [ " data " ] :
if items [ " attributes " ] [ " item_type " ] == " song " :
item_id = items [ " id " ]
getkey = requests . get (
f " { planurl } / { item_id } /key " , auth = ( CLIENTSEC , SECRET )
) . json ( )
key = getkey [ " data " ] [ " attributes " ] [ " starting_key " ]
filename = f " { items [ ' attributes ' ] [ ' title ' ] } - { key } "
filenames . append ( filename )
2025-06-18 21:17:32 -04:00
print ( f " Here are the filenames: { filenames } " )
get_attachments ( filenames , service_type_id , plan_id , service_date )
def get_attachments ( filenames , service_type_id , plan_id , service_date ) :
# I've added the payload to logging so please check this payload to see what your church will/won't allow.
2025-06-11 16:46:35 -04:00
planattachments = f " { BASE } /services/v2/service_types/ { service_type_id } /plans/ { plan_id } /all_attachments "
plana = requests . get ( planattachments , auth = ( CLIENTSEC , SECRET ) ) . json ( )
2025-06-18 21:17:32 -04:00
pp . pprint ( plana )
2025-06-11 16:46:35 -04:00
logging . info ( " Looking for attachments " )
2025-06-18 21:17:32 -04:00
non_downloadables = [ ]
2025-06-11 16:46:35 -04:00
if plana [ " data " ] == [ ] :
logging . info ( f " No attachments available for { service_date } " )
else :
logging . info ( f " Attachments available for { service_date } " )
for attachments in plana [ " data " ] :
2025-06-18 21:17:32 -04:00
if attachments [ " attributes " ] [ " allow_mp3_download " ] :
if (
attachments [ " attributes " ] [ " content_type " ] is not None
and " audio " in attachments [ " attributes " ] [ " content_type " ]
) :
attachment_id = attachments [ " id " ]
file = attachments [ " attributes " ] [ " filename " ]
if ALLFILES == " False " :
print (
f " Grabbing only track name matches as ALLFILES is { ALLFILES } "
)
print ( file )
print ( filenames )
close_match = Utils . find_close_matches ( file , filenames )
# TODO: Figure out how to run the close matches with non-obvious file/content names
get_download_url ( attachment_id , close_match , ALLFILES )
elif ALLFILES == " True " :
print ( f " Grabbing all files as ALLFILES is { ALLFILES } " )
get_download_url ( attachment_id , file , ALLFILES )
else :
continue
else :
non_downloadables . append ( attachments [ " attributes " ] [ " filename " ] )
continue
if len ( non_downloadables ) > 0 :
logging . info (
f " The following audio content-types files do not allow for downloading: { non_downloadables } "
)
return " Returning this statement. "
2025-06-11 16:46:35 -04:00
2025-06-18 21:17:32 -04:00
def get_download_url ( uuid , file , allfiles ) :
2025-06-11 16:46:35 -04:00
"""
Has not been fully tested yet .
To get the s3 bucket download url , this is a sample curl url :
curl - u client_sec : secret_secret - X POST https : / / api . planningcenteronline . com / services / v2 / media / media_id / attachments / attachment_id / open
"""
downurl = f " { BASE } /services/v2/attachments/ { uuid } /open "
downreq = requests . post ( downurl , auth = ( CLIENTSEC , SECRET ) )
downdata = downreq . json ( )
s3url = downdata [ " data " ] [ " attributes " ] [ " attachment_url " ]
s3response = requests . get ( s3url )
2025-06-18 21:17:32 -04:00
print ( s3response . status_code )
if s3response . status_code == 200 :
with open ( f " { str ( file ) } " , " wb " ) as f :
f . write ( s3response . content )
# if allfiles == "False":
# # with open(f"{str(file)[2:-2]}.mp3", "wb") as f:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
# else:
# with open(f"{str(file)}", "wb") as f:
# f.write(s3response.content)
edit_metadata ( file )
return " Successfully downloaded "
else :
return " Something went wrong with the download "
def edit_metadata ( file ) :
print ( file )
print ( " ****** " )
command = [
" id3v2 " ,
" -g " ,
" Christian " ,
# TODO: Add environmental variable for people to change their own genre.
" -A " ,
" PCO " ,
" -a " ,
ARTIST ,
" -t " ,
file ,
file ,
]
subprocess . call ( command )
2025-06-11 16:46:35 -04:00
if __name__ == " __main__ " :
import logging
2025-06-18 21:17:32 -04:00
2025-06-11 16:46:35 -04:00
numeric_level = getattr ( logging , LOGLEVEL . upper ( ) , None )
2025-06-18 21:17:32 -04:00
logging . basicConfig ( filename = " record.log " , level = numeric_level )
2025-06-11 16:46:35 -04:00
app . run ( host = " 0.0.0.0 " , port = 8000 , debug = True )