174 lines
6.6 KiB
Python
174 lines
6.6 KiB
Python
"""
|
|
{"payload":{"training_session_attendance_ids":["7ebf8a61-73af-4d39-9792-92e54a7bcab6","5d483ea0-5f78-4b72-85f3-8fd191127974","aea3b1cb-0cb3-4b8a-bb1d-b7889d546e1e","0944ef2b-13ff-4d7d-87e2-6893213da321"],"attendance":"attended"}}
|
|
|
|
POST to https://api.northpass.com/v2/bulk/registrant_attendances
|
|
|
|
In Norm's Sandbox:
|
|
- Event UUID: 4787dc9f-aab2-4eb7-8790-e0e50738c52c
|
|
- Future Session UUID: 46102626-2966-4153-80ba-c84b5aaef31a
|
|
- Past Session UUID: 072909ff-ace8-4ef1-84bb-e93320486134
|
|
|
|
Sample CSV from Nintex:
|
|
Name,Email,Attended
|
|
Norm1,norm36@norm.com,Yes,Not Activated,Learner
|
|
Norm1,norm37@norm.com,Yes,Not Activated,Learner
|
|
Norm1,norm31@norm.com,Yes,Not Activated,Learner
|
|
Climbing Norm,norm+climbing@northpass.com,Yes,Not Activated,Admin
|
|
Admin Norm,nrasmussen+test2@gainsight.com,Yes,Activated, Admin
|
|
Mtn Norm,norm+mtnclimb@northpass.com,Yes,Activated,Manager
|
|
No Atoms,energy@energy.com,Yes,Activated,Learner
|
|
|
|
"""
|
|
|
|
import pandas as pd
|
|
import requests
|
|
import Apikeys
|
|
import pprint
|
|
from urllib.parse import quote
|
|
from collections import defaultdict
|
|
import random
|
|
import string
|
|
|
|
|
|
pp = pprint.PrettyPrinter(indent=4)
|
|
IMPORTFILE = "/Users/normrasmussen/Downloads/nintex-attendance-sample.csv"
|
|
BASEURL = "https://api.northpass.com/v2"
|
|
APIKEY = Apikeys.NORMSANDBOX
|
|
HEADERS = {
|
|
"accept": "application/json",
|
|
"X-Api-Key": APIKEY,
|
|
}
|
|
BASEURL = "https://api.northpass.com/v2/"
|
|
EVENT_UUID = "4787dc9f-aab2-4eb7-8790-e0e50738c52c"
|
|
FUTURE_SESH = "46102626-2966-4153-80ba-c84b5aaef31a"
|
|
PAST_SESH = "072909ff-ace8-4ef1-84bb-e93320486134"
|
|
SESH_WTH_PPL = "b3ec3a8b-ecd1-4dde-8d74-96377393026f"
|
|
PPL_LISTS = { "to_be_marked": [], "to_be_created": [], "to_be_registered": [] }
|
|
|
|
|
|
def check_if_exists(emails):
|
|
for email in emails:
|
|
encemail = quote(email)
|
|
try:
|
|
find_person_url = f"{BASEURL}/people?filter[email][eq]={encemail}"
|
|
find_person_resp = requests.get(find_person_url, headers=HEADERS).json()
|
|
person_uuid = find_person_resp["data"][0]["id"]
|
|
except Exception as e:
|
|
# print(f"Error! {e} with person {email}")
|
|
if find_person_resp['data'] == []:
|
|
PPL_LISTS['to_be_created'].extend([ email ])
|
|
|
|
else:
|
|
PPL_LISTS['to_be_created'].remove(email) if email in PPL_LISTS['to_be_created'] else None
|
|
PPL_LISTS['to_be_registered'].extend([ person_uuid ])
|
|
|
|
if PPL_LISTS['to_be_created']:
|
|
ret = create_people(PPL_LISTS['to_be_created'])
|
|
#WARN: The status code will be 202 even with an empty list for emails!!
|
|
if str(ret).startswith('20'):
|
|
PPL_LISTS['to_be_created'] = []
|
|
else:
|
|
print("To Be Created List is now Empty")
|
|
return "Something"
|
|
|
|
|
|
def create_people(emails):
|
|
load_list = []
|
|
if len(emails) > 1:
|
|
for people in emails:
|
|
miniload = {"email": people, "groups": ""}
|
|
load_list.append(miniload)
|
|
payload = { "data": { "attributes": { "people": load_list } } }
|
|
else:
|
|
payload = { "data": { "attributes": { "people": [{"email": str(emails)[2:-2] }] } } }
|
|
|
|
bulk_create = f"{BASEURL}/bulk/people"
|
|
bulk_invite = requests.post(url=bulk_create, headers=HEADERS, json=payload)
|
|
check_if_exists(emails)
|
|
return bulk_invite.status_code
|
|
|
|
|
|
def check_attendance(emails):
|
|
# df = pd.read_csv(IMPORTFILE)
|
|
# emails = df["Email"].unique()
|
|
attendees = []
|
|
check_attendance_url = f"{BASEURL}training_events/{EVENT_UUID}/training_sessions/{SESH_WTH_PPL}/registrants"
|
|
check_attendance_resp = requests.get(check_attendance_url, headers=HEADERS).json()
|
|
for person in check_attendance_resp['data']:
|
|
attendance_uuid = person['id']
|
|
person_uuid = person['relationships']['person']['data']['id']
|
|
person_email = person['attributes']['person_email']
|
|
attendees.append((person_uuid, person_email, attendance_uuid))
|
|
|
|
return compare_csv_to_registrants(emails, attendees)
|
|
|
|
def register_for_session(to_register):
|
|
pass
|
|
|
|
|
|
def compare_csv_to_registrants(emails, attendees):
|
|
tuple_mapping = defaultdict(list)
|
|
for person in attendees:
|
|
tuple_mapping[person[1]].append(person)
|
|
registered_emails = []
|
|
not_registered_emails = []
|
|
for item in emails:
|
|
if item in tuple_mapping:
|
|
registered_emails.extend(tuple_mapping[item])
|
|
else:
|
|
not_registered_emails.append(item)
|
|
return registered_emails, not_registered_emails
|
|
|
|
def mark_attendance(tuple_list):
|
|
attendance_url = f"{BASEURL}/bulk/registrant_attendances"
|
|
att_uuids = [stuff[2] for stuff in tuple_list]
|
|
payload = { "payload": {
|
|
"training_session_attendance_ids": att_uuids,
|
|
"attendance": "attended"
|
|
}}
|
|
mark_attended = requests.post(attendance_url, headers=HEADERS, json=payload)
|
|
return mark_attended.status_code
|
|
|
|
def generate_random_email():
|
|
"""
|
|
Generates a random email address using built-in Python modules.
|
|
"""
|
|
domains = ["gmail.com", "yahoo.com", "outlook.com", "example.com", "test.com"]
|
|
username_characters = string.ascii_lowercase + string.digits
|
|
username_length = random.randint(6, 12)
|
|
username = ''.join(random.choice(username_characters) for i in range(username_length))
|
|
domain = random.choice(domains)
|
|
email = f"{username}@{domain}"
|
|
return email
|
|
|
|
if __name__ == "__main__":
|
|
emails = []
|
|
for _ in range(5):
|
|
email = generate_random_email()
|
|
emails.append(email)
|
|
emails.append('norm35@norm.com')
|
|
lists = check_attendance(emails) # returns registered_emails, not_registered_emails
|
|
PPL_LISTS['to_be_marked'] = lists[0]
|
|
check_if_exists(lists[1])
|
|
while not PPL_LISTS['to_be_created']:
|
|
register_for_session()
|
|
|
|
|
|
# TODO: Something needs to be added here. After we add the people, we need to check them again for check if exists and then that should go straight into the register function. That should make the data flow more reusable.
|
|
# This could be a for loop until all the non-emails have been added, they are then grouped back with all the other non-registered users, and then flow through the registration functions.
|
|
"""
|
|
So the flow would be:
|
|
X get CSV and grab all emails
|
|
X compare emails to registrations
|
|
emails that are registered go over to the mark as attended bucket.
|
|
emails that are not registered, are checked if they exist
|
|
if they exist, they are on hold.
|
|
if they don't exist, they are created
|
|
people are now all mixed with the on hold people
|
|
all are registered
|
|
now the original list can be mark as attended
|
|
"""
|
|
# mark_status = mark_attendance(lists[0])
|
|
# in_system, out_system = check_if_exists(lists[1])
|
|
# check_if_exists()
|