Argh, I haven't been at my home office and there was Cohesion this week, so I keep forgetting to push my changesgit status Todos, cleaned some dirs, and updated a script or too.

This commit is contained in:
Norm Rasmussen
2024-07-19 08:53:30 -04:00
parent dfeb4a8292
commit bf0244b2fe
7 changed files with 181 additions and 0 deletions

View File

@ -0,0 +1 @@
ZENJOB = "LIXqtHXEqcXHyN0EtezngnpzA"

Binary file not shown.

View File

@ -0,0 +1,84 @@
import requests
import time
from playwright.sync_api import sync_playwright, Playwright
import pandas as pd
import Apikeys
IMPORTFILE = "~/Downloads/Internal Zenjob Testing group.csv"
APIKEY = Apikeys.ZENJOB
HEADERS = {"X-Api-Key": APIKEY, "accept": "application/json"}
ZENJOBURL = "https://zenjob.northpass.com/?uid="
BASEURL = "https://api.northpass.com/v2/"
def delete_people_first():
print("Deleting everyone with a zenjob email address...")
count = 0
delete_list = []
while True:
count += 1
person_url = f"{BASEURL}people?filter[sso_uid][cont]=zenjob.com&filter[partnerships_type][not_eq]=Partnerships::Admin&include=partnerships&page={count}"
person_resp = requests.get(person_url, headers=HEADERS)
persjson = person_resp.json()
nextlink = persjson["links"]
for people in persjson["data"]:
learneruuid = people["id"]
delete_list.append(learneruuid)
if "next" not in nextlink:
break
del_load = {"payload": delete_list}
print(len(delete_list))
# del_url = f"{BASEURL}bulk/people/delete"
print("Running Post Func for Deleting Users")
# post_func(del_url, del_load)
def run(playwright: Playwright):
df = pd.read_csv(IMPORTFILE)
emails = df["Work Email"].values.tolist()
# for email in emails:
# webkit = playwright.webkit
# browser = webkit.launch()
# context = browser.new_context()
# page = context.new_page()
# # page.goto(f"{BASEURL}{email}")
# print(f"{ZENJOBURL}{email}")
# context.close()
add_ppl_to_groups(emails)
def add_ppl_to_groups(emails):
print("Assuming everyone was created with playwright, now I'm going to add everyone to the group.")
learner_list = []
for person in emails:
get_person_url = f"{BASEURL}people?filter[email][eq]={person}"
get_person_resp = requests.get(get_person_url, headers=HEADERS)
person_json = get_person_resp.json()
learner_uuid = person_json["data"][0]["id"]
learner_list.append(learner_uuid)
bulk_to_groups_url = f"{BASEURL}bulk/people/membership"
bulk_payload = {"payload": {"person_ids": learner_list, "group_ids": ["940a5d24-32af-45f1-8ed4-8a6b4689d9c9"]}}
print("Running Post Func for Bulk Add to Groups")
# post_func(bulk_to_groups_url, bulk_payload)
def post_func(url, payload):
try:
req = requests.post(url, headers=HEADERS, json=payload)
except (HTTPError, ExceptionError) as e:
print(e)
finally:
print(req.status_code)
print(req.text)
if __name__ == "__main__":
delete_people_first()
time.sleep(5)
print("Sleeping to allow deletions to process.")
with sync_playwright() as playwright:
run(playwright)