Files
Gainsight/Scripts/API_Tests/bulk_mark_ilt_attendance.py
2025-12-04 16:54:04 -05:00

152 lines
5.8 KiB
Python

"""
{"payload":{"training_session_attendance_ids":["7ebf8a61-73af-4d39-9792-92e54a7bcab6","5d483ea0-5f78-4b72-85f3-8fd191127974","aea3b1cb-0cb3-4b8a-bb1d-b7889d546e1e","0944ef2b-13ff-4d7d-87e2-6893213da321"],"attendance":"attended"}}
POST to https://api.northpass.com/v2/bulk/registrant_attendances
In Norm's Sandbox:
- Event UUID: 4787dc9f-aab2-4eb7-8790-e0e50738c52c
- Future Session UUID: 46102626-2966-4153-80ba-c84b5aaef31a
- Past Session UUID: 072909ff-ace8-4ef1-84bb-e93320486134
Sample CSV from Nintex:
Name,Email,Attended
Norm1,norm36@norm.com,Yes,Not Activated,Learner
Norm1,norm37@norm.com,Yes,Not Activated,Learner
Norm1,norm31@norm.com,Yes,Not Activated,Learner
Climbing Norm,norm+climbing@northpass.com,Yes,Not Activated,Admin
Admin Norm,nrasmussen+test2@gainsight.com,Yes,Activated, Admin
Mtn Norm,norm+mtnclimb@northpass.com,Yes,Activated,Manager
No Atoms,energy@energy.com,Yes,Activated,Learner
"""
import pandas as pd
import requests
import Apikeys
import pprint
from urllib.parse import quote
from collections import defaultdict
pp = pprint.PrettyPrinter(indent=4)
IMPORTFILE = "/Users/normrasmussen/Downloads/nintex-attendance-sample.csv"
BASEURL = "https://api.northpass.com/v2"
APIKEY = Apikeys.NORMSANDBOX
HEADERS = {
"accept": "application/json",
"X-Api-Key": APIKEY,
}
BASEURL = "https://api.northpass.com/v2/"
EVENT_UUID = "4787dc9f-aab2-4eb7-8790-e0e50738c52c"
FUTURE_SESH = "46102626-2966-4153-80ba-c84b5aaef31a"
PAST_SESH = "072909ff-ace8-4ef1-84bb-e93320486134"
SESH_WTH_PPL = "b3ec3a8b-ecd1-4dde-8d74-96377393026f"
def check_if_exists(emails):
in_system = []
not_in_system = []
for email in emails:
encemail = quote(email)
try:
find_person_url = f"{BASEURL}/people?filter[email][eq]={encemail}"
find_person_resp = requests.get(find_person_url, headers=HEADERS).json()
person_uuid = find_person_resp["data"][0]["id"]
except Exception as e:
print(f"Error! {e} with person {email}")
if find_person_resp['data'] == []:
not_in_system.append(email)
else:
print(f"Person's email: {email} and uuid: {person_uuid}")
in_system.append(person_uuid)
return not_in_system, in_system
def create_people(emails):
load_list = []
print(str(emails)[2:-2])
if len(emails) > 1:
for people in emails:
miniload = {"email": people, "groups": ""}
load_list.append(miniload)
payload = { "data": { "attributes": { "people": load_list } } }
else:
payload = { "data": { "attributes": { "people": [{"email": str(emails)[2:-2] }] } } }
bulk_create = f"{BASEURL}/bulk/people"
bulk_invite = requests.post(url=bulk_create, headers=HEADERS, json=payload)
print(payload)
return bulk_invite.status_code
def check_attendance():
df = pd.read_csv(IMPORTFILE)
emails = df["Email"].unique()
attendees = []
check_attendance_url = f"{BASEURL}training_events/{EVENT_UUID}/training_sessions/{SESH_WTH_PPL}/registrants"
check_attendance_resp = requests.get(check_attendance_url, headers=HEADERS).json()
for person in check_attendance_resp['data']:
attendance_uuid = person['id']
person_uuid = person['relationships']['person']['data']['id']
person_email = person['attributes']['person_email']
attendees.append((person_uuid, person_email, attendance_uuid))
# registered_emails, non_registered = compare_csv_to_registrants(emails, attendees)
return compare_csv_to_registrants(emails, attendees)
def register_for_session(in_system):
def compare_csv_to_registrants(emails, attendees):
tuple_mapping = defaultdict(list)
# NOTE: Some logic. Emails given to script. Session registrants grabbed. If email is on registered list, mark as attended. If email is NOT on registered list, check if they exist. If they exist, register+mark as attended. If they don't, create user, then register+mark as attended.
for person in attendees:
tuple_mapping[person[1]].append(person)
registered_emails = []
not_registered_emails = []
for item in emails:
if item in tuple_mapping:
registered_emails.extend(tuple_mapping[item])
else:
not_registered_emails.append(item)
return registered_emails, not_registered_emails
def mark_attendance(tuple_list):
attendance_url = f"{BASEURL}/bulk/registrant_attendances"
att_uuids = [stuff[2] for stuff in tuple_list]
payload = { "payload": {
"training_session_attendance_ids": att_uuids,
"attendance": "attended"
}}
mark_attended = requests.post(attendance_url, headers=HEADERS, json=payload)
return mark_attended.status_code
if __name__ == "__main__":
lists = check_attendance()
not_in_system, in_system = check_if_exists(lists[1])
if not_in_system is not None:
create_people(not_in_system)
# TODO: Something needs to be added here. After we add the people, we need to check them again for check if exists and then that should go straight into the register function. That should make the data flow more reusable.
# This could be a for loop until all the non-emails have been added, they are then grouped back with all the other non-registered users, and then flow through the registration functions.
"""
So the flow would be:
get CSV and grab all emails
compare emails to registrations
emails that are registered go over to the mark as attended bucket.
emails that are not registered, are checked if they exist
if they exist, they are on hold.
if they don't exist, they are created
people are now all mixed with the on hold people
all are registered
now the original list can be mark as attended
"""
# mark_status = mark_attendance(lists[0])
# in_system, out_system = check_if_exists(lists[1])
# check_if_exists()