BirchStreet download. Fixed Cin7 and backed up all the templates. Sandbox templates.

This commit is contained in:
Norm Rasmussen
2024-12-16 16:29:06 -05:00
parent 9a045e60dc
commit b8d085e12c
603 changed files with 13679 additions and 5939 deletions

View File

@ -40,6 +40,12 @@ project_name = argv[1]
def get_data():
"""
Process CSV data to create course completion and enrollment records.
Reads data from CSV, processes each row to get people and course information,
and creates corresponding attempt and enrollment payloads.
"""
df = manage_csv.import_as_dataframe()
df.columns = df.columns.str.lower()
# row_iterator = df.iterrows()
@ -64,6 +70,16 @@ def get_data():
def get_people(email):
"""
Retrieve person information by email address.
Args:
email (str): Email address of the person to look up
Returns:
tuple: (uuid, email) if person is found and activated
None: if person is not found or not activated
"""
url = f"{baseurl}/people?filter[email][eq]={email}"
returned = calls.get(url)
if returned["data"] == []:
@ -79,6 +95,16 @@ def get_people(email):
def get_courses(course):
"""
Retrieve course information by course name.
Args:
course (str): Name of the course to look up
Returns:
str: Course UUID if found
None: if course is not found
"""
print(f"LOOKING FOR COURSE {course}")
encoded_course = quote(course)
url = f"{baseurl}/courses?filter[name][eq]={encoded_course}"
@ -91,6 +117,17 @@ def get_courses(course):
def create_attempt_payload(course_uuid, learner_uuid, date_str):
"""
Create a course attempt payload for API submission.
Args:
course_uuid (str): UUID of the course
learner_uuid (str): UUID of the learner
date_str (str): Completion date string
Returns:
dict: Formatted payload for course attempt creation
"""
print(f"Creating Course Attempt Payload")
now = datetime.now()
date_format = "%Y-%m-%d %H:%M:%S"
@ -126,6 +163,17 @@ def create_attempt_payload(course_uuid, learner_uuid, date_str):
def create_enrollment_payload(course_uuid, learner_uuid, date_str):
"""
Create an enrollment payload for API submission.
Args:
course_uuid (str): UUID of the course
learner_uuid (str): UUID of the learner
date_str (str): Enrollment date string
Returns:
dict: Formatted payload for enrollment creation
"""
print(f"Creating Course Enrollment Payload")
now = datetime.now()
date_format = "%Y-%m-%d %H:%M:%S"
@ -155,6 +203,14 @@ def create_enrollment_payload(course_uuid, learner_uuid, date_str):
def create_enrollment_resource(enrollment_payload):
"""
Submit enrollment resource creation request to API.
Args:
enrollment_payload (dict): Formatted enrollment payload
Prints API response status code.
"""
item = items["enrollments"]
enrollment_url = f"{baseurl}/migration/projects/{list(probject.values())[0]}/items/{item}/enrollment_resources"
print(enrollment_url)
@ -166,6 +222,14 @@ def create_enrollment_resource(enrollment_payload):
def create_attempt_resource(attempt_payload):
"""
Submit course attempt resource creation request to API.
Args:
attempt_payload (dict): Formatted course attempt payload
Prints API response status code.
"""
item = items["course_attempts"]
attempt_url = f"{baseurl}/migration/projects/{list(probject.values())[0]}/items/{item}/course_attempt_resources"
print(attempt_url)
@ -177,6 +241,14 @@ def create_attempt_resource(attempt_payload):
)
def create_project(project_name):
"""
Create a new migration project.
Args:
project_name (str): Name of the project to create
Updates global probject dictionary with new project ID.
"""
proj_payload = {
"data": {
"type": "migration_projects",
@ -190,6 +262,17 @@ def create_project(project_name):
def create_item(i_type):
"""
Create a new migration item.
Args:
i_type (str): Type of item to create (e.g., 'course_attempts', 'enrollments')
Returns:
str: Created item ID
Updates global items dictionary with new item ID.
"""
print("Creating Item")
# Item Type Options: 'courses', 'sections', 'activities', 'people', 'enrollments', 'course_attempts', 'quiz_attempts', 'certificates', 'learning_path_attempts'
item_full_url = f"{baseurl}/migration/projects/{list(probject.values())[0]}/items"
@ -209,6 +292,11 @@ def create_item(i_type):
def start_migration():
"""
Start the migration process for the created project.
Sends request to API to begin migration and prints response.
"""
print("Starting Migration Function")
start_url = (
f"{baseurl}/migration/projects/{list(probject.values())[0]}/start_migration"