import json
import os
import glob
import ldap3
import pandas
class LDAPBatchAdd:
def __init__(self, conf):
# initialize config
self.config = {
"server": "",
"userdn": "",
"userpassword": "",
"template_dn": "",
}
self.config.update(conf)
# connect server
self.server = ldap3.Server(self.config["server"])
self.conn = ldap3.Connection(
self.server,
user=self.config["userdn"],
password=self.config["userpassword"],
auto_bind=True)
# get template
assert self.conn.search(
search_base=self.config["template_dn"],
search_filter="(objectClass=*)",
search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES), "Template user not found"
self.template = self.conn.entries[0]
def add_user(self, fields):
# calculate dn (replace first fields)
template_dn = self.template.entry_dn.split(",")
k, v = template_dn[0].split("=")
new_dn = ",".join([k + "=" + fields.get(k, v)] + template_dn[1:])
# add
return self.conn.add(new_dn, self.template.objectClass.value, fields)
def main():
# load config
script_dir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(script_dir, "data/config.json"), "r") as f:
conf = json.load(f)
f.close()
batch_add = LDAPBatchAdd(conf)
# load autocomplete
import autocomplete
plugins = [plugin(conf.get(plugin.__name__, {})) for plugin in autocomplete.plugins]
# import all files
for csv_filename in glob.glob(os.path.join(script_dir, "data/import/*.csv")):
added_users = []
# add users
df = pandas.read_csv(csv_filename)
for idx, row in df.iterrows():
fields = row.to_dict()
# autocomplete
for plugin in plugins:
fields = plugin.run(fields)
# add
if batch_add.add_user(fields):
added_users.append(fields)
else:
print("Failed to add: ", fields)
print("{} user(s) added.".format(len(added_users)))
# write result csv
out_path = os.path.join(script_dir, "data/import_result")
os.makedirs(out_path, exist_ok=True)
out_filename = os.path.join(out_path, os.path.basename(csv_filename))
pandas.DataFrame.from_dict(added_users).to_csv(out_filename)
if __name__ == "__main__":
main()