AI Sample Managers:
Algonaut Atlas.
XLN Audio - XO.
https://www.producthunt.com/alternatives/sononym
Python ML
https://hackernoon.com/intro-to-audio-analysis-recognizing-sounds-using-machine-learning-qy2r3ufl
Sample name managers:
https://www.adsrsounds.com/product/software/adsr-sample-manager/
Steps to sort samples.
1_find_still_compressed.py
2_uncompress.py
3_find_carrot_and_replace.py
4_list_files_multi.py
5_csv_of_dups.py
6_check_dups.py
7_prepare_copy.py
8_copy.py
Go through archive to see if some need to be uncompressed.
# 1_find_still_compressed.py
import csv
import datetime
import os
PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'
def save_paths(csv_filename, paths):
output_csv_path = os.path.join(LOCATION, csv_filename)
output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
output_csv_fieldnames = ['paths']
writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)
header = {value: value for value in output_csv_fieldnames}
writer.writerow(header)
rows = []
for path in paths:
row = {'paths': path}
rows.append(row)
for row in rows:
writer.writerow(row)
output_csv_file.close()
if __name__ == "__main__":
process_folder_path = os.path.join(LOCATION, PROCESS_FOLDERNAME)
paths = {}
for root, dirs, files in os.walk(process_folder_path):
for file in files:
if file.lower().endswith(('.rar', '.zip')):
if file.rsplit('.')[0] not in dirs:
#print(dirs)
paths[os.path.join(root, file)] = True
csv_filename = "compressed_{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
save_paths(csv_filename, paths)
Uncompress.
#2_uncompress.py
import csv
import datetime
import os
from pyunpack import Archive
import sys
PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'
if __name__ == "__main__":
#Load compressed files.
csv_filename = "compressed_{}.csv".format(PROCESS_FOLDERNAME)
csv_path = os.path.join(LOCATION, csv_filename)
compressed_paths = {}
if os.path.exists(csv_path):
csv_file = open(csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
compressed_paths[row['path']] = True
csv_file.close()
else:
print("File not found: {}".format(csv_path))
sys.exit()
total = len(compressed_paths)
for idx, compressed_path in enumerate(compressed_paths, 1):
print("{}/{}".format(idx, total))
directory = os.path.dirname(compressed_path)
new_dir = os.path.basename(compressed_path).rsplit('.', 1)[0]
new_path = os.path.join(directory, new_dir)
if os.path.exists(new_path):
print("Allready exists: {}\n{}\n".format(compressed_path, new_path))
else:
print("{}\n-->\n{}\n".format(compressed_path, new_path))
os.mkdir(new_path)
Archive(compressed_path).extractall(new_path)
Rename files with carrots.
# 3_find_carrot_and_replace.py
import os
import shutil
PATH = r'F:\Audio\processed\Loops_Unsorted'
if __name__ == "__main__":
count = 0
for root, dirs, files in os.walk(PATH):
for file in files:
count += 1
if '^' in file:
print(os.path.join(root, file))
src = os.path.join(root, file)
dst = os.path.join(root, file.replace('^', '_'))
shutil.move(src, dst)
print("{} files looked at.".format(count))
Get file hashes.
# 4_list_files_multi.py
import concurrent.futures
import csv
import datetime
import hashlib
import keyboard
import multiprocessing as mp
import os
import random
import shutil
import sys
import time
PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'
def create_checksum(file_path):
hash = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash.update(chunk)
return(hash.hexdigest())
def hash_file(file_path):
#time.sleep(3)
return(file_path, create_checksum(file_path))
def create_chunks(list_name, n):
for i in range(0, len(list_name), n):
yield list_name[i:i + n]
# Print iterations progress
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {iteration}/{total} {percent}% {suffix} ', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def save_hashes(csv_filename, paths_hashed):
output_csv_path = os.path.join(LOCATION, csv_filename)
output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
output_csv_fieldnames = ['hash', 'path']
writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)
header = {value: value for value in output_csv_fieldnames}
writer.writerow(header)
rows = []
for path in paths_hashed:
row = {'hash': paths_hashed[path], 'path': path}
rows.append(row)
for row in rows:
writer.writerow(row)
output_csv_file.close()
def save_extensions(csv_filename, exts):
output_csv_path = os.path.join(LOCATION, csv_filename)
output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
output_csv_fieldnames = ['extension']
writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)
header = {value: value for value in output_csv_fieldnames}
writer.writerow(header)
rows = []
for ext in exts:
row = {'extension': ext}
rows.append(row)
for row in rows:
writer.writerow(row)
output_csv_file.close()
if __name__ == "__main__":
confirmation = ''
while confirmation not in ['Y', 'N']:
confirmation = input('Did you remember to run find_carrot_and_replace.py? (Y/N): ').upper()
if confirmation == 'N':
sys.exit()
begin_time = datetime.datetime.now()
begin_time_str = begin_time.strftime("%Y%m%d%H%M%S")
print("Program Launched at: {}".format(begin_time))
#Load existing hashed files.
csv_hashed = {}
output_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
if os.path.exists(output_csv_path):
csv_file = open(output_csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
csv_hashed[row['path']] = row['hash']
csv_file.close()
loaded_hashes = csv_hashed.copy()
#Load all filenames.
print("Load files.")
_start = time.time()
to_hash_paths = {}
count = 0
start_count = 0
process_folder_path = os.path.join(LOCATION, PROCESS_FOLDERNAME)
extensions = {}
for root, dirs, files in os.walk(process_folder_path):
for file in files:
extensions[file.rsplit('.', 1)[-1]] = True
path = os.path.join(root, file)
if path not in loaded_hashes:
to_hash_paths[path] = True
else:
start_count += 1
del loaded_hashes[path]
count += 1
# if count >= 1000:
# break
else:
continue
break
total_hashes = count
done_time = datetime.datetime.now()
print("Done loading {} in {}".format(total_hashes, time.time() - _start))
####### Extensions.
csv_filename = "extensions_{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
save_extensions(csv_filename, extensions)
#Process hashes
_start = time.time()
_next_time_start = time.time()
_save_time_start = time.time()
rps = 0
total_rps = 0
average_rps = 1
count_rps = 0
files_hashed = {}
count = start_count
for file_p in to_hash_paths:
filepath, hash_of_file = hash_file(file_p)
files_hashed[filepath] = hash_of_file
#print(count)
printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
count += 1
if count % 10 == 0:
count_rps += 1
rps = 10/(time.time() - _next_time_start)
_next_time_start = time.time()
total_rps += rps
average_rps = total_rps / count_rps
if keyboard.is_pressed('q'):
break
if keyboard.is_pressed('s'):
csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
all_hashed = csv_hashed | files_hashed
save_hashes(csv_filename, all_hashed)
if (time.time() - _save_time_start) > 300:
csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
all_hashed = csv_hashed | files_hashed
save_hashes(csv_filename, all_hashed)
_save_time_start = time.time()
printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
print("\n\nLoop took: {}".format(time.time() - _start))
# confirmation = ''
# while confirmation not in ['Y', 'N']:
# confirmation = input('Continue? (Y/N): ').upper()
# if confirmation == 'N':
# sys.exit()
####### Thread Pool Executor
# _start = time.time()
# files_hashed = {}
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = []
# for filepath in to_hash_paths:
# futures.append(executor.submit(hash_file, file_path=filepath))
# count = 1
# for future in concurrent.futures.as_completed(futures):
# filepath, hash_of_file = future.result()
# files_hashed[filepath] = hash_of_file
# #print(count)
# count += 1
# print("ThreadPoolExecutor took: {}".format(time.time() - _start))
# # print(files_hashed)
# confirmation = ''
# while confirmation not in ['Y', 'N']:
# confirmation = input('Continue? (Y/N): ').upper()
# if confirmation == 'N':
# sys.exit()
####### Multi Processing
# _start = time.time()
# cpu_c = mp.cpu_count()
# pool = mp.Pool()
# files_hashed = {}
# count = start_count
# exit_now = False
# print("Load Pool")
# _next_time_start = time.time()
# rps = 0
# total_rps = 0
# average_rps = 1
# count_rps = 0
# for list_of_files in create_chunks(list(to_hash_paths), cpu_c):
# pool.map(hash_file, list_of_files)
# hashes = pool.map(hash_file, list_of_files)
# for file_path, hash_of_file in hashes:
# files_hashed[file_path] = hash_of_file
# #print(count)
# printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
# count += 1
# if count % 10 == 0:
# count_rps += 1
# rps = 10/(time.time() - _next_time_start)
# _next_time_start = time.time()
# total_rps += rps
# average_rps = total_rps / count_rps
# if keyboard.is_pressed('q'):
# break
# pool.close()
# printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
# print("\n\nMultiprocessing took: {}".format(time.time() - _start))
#print(all_hashed)
#Save hashes of files in CSV.
#Save hashes.
csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
all_hashed = csv_hashed | files_hashed
save_hashes(csv_filename, all_hashed)
List files duplicates.
# 5_csv_of_dups.py
import csv
import datetime
import os
import sys
PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'
if __name__ == "__main__":
#Load existing hashed files.
csv_hashed = {}
output_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
if os.path.exists(output_csv_path):
csv_file = open(output_csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
csv_hashed[row['path']] = row['hash']
csv_file.close()
else:
print("Not found: {}".format(output_csv_path))
sys.exit()
# Group hashes.
multi_hashed = {}
for path in csv_hashed:
multi_hashed.setdefault(csv_hashed[path], []).append(path)
# Save Grouped Hashes to CSV.
output_csv_path = os.path.join(LOCATION, "{}_multi_found_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
output_csv_fieldnames = ['hash', 'path']
writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)
header = {value: value for value in output_csv_fieldnames}
writer.writerow(header)
rows = []
for hash_ in multi_hashed:
if len(multi_hashed[hash_]) > 1:
row = {'hash': hash_, 'path': multi_hashed[hash_][0]}
rows.append(row)
for path in multi_hashed[hash_][1:]:
row = {'hash': '', 'path': path}
rows.append(row)
for row in rows:
writer.writerow(row)
output_csv_file.close()
Put duplicates in a directory for checking.
# 6_check_dups.py
import csv
import datetime
import os
import shutil
import sys
DUPS_FOLDERNAME = 'Duplicates'
LOCATION = r'F:\Audio\processed'
if __name__ == "__main__":
#Load dup hashed files.
csv_hashed = []
dup_csv_path = os.path.join(LOCATION, "{}.csv".format('Loops_Unsorted_multi_found'))
if os.path.exists(dup_csv_path):
csv_file = open(dup_csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
filename, file_extension = os.path.splitext(row['path'])
if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
csv_hashed.append([row['hash'], row['path']])
csv_file.close()
else:
print("Not found: {}".format(dup_csv_path))
sys.exit()
# Get missing exception.
hash_extensions = {}
for item in csv_hashed:
filename, file_extension = os.path.splitext(item[1])
#print(filename, file_extension)
hash_extensions.setdefault(item[0], []).append(file_extension.lower())
chosen_hash_extensions = {}
for hash in hash_extensions:
extentions = hash_extensions[hash]
extension = set(extentions)
if len(extension) > 1:
print("Too many extensions: {} {}".format(hash, ", ".join(extension)))
sys.exit()
chosen_hash_extensions[hash] =list(extension)[0]
# for hash in chosen_hash_extensions:
# print("Hash: {} Extension: {}".format(hash, chosen_hash_extensions[hash]))
# Copy files
count_of_hash = ''
count_copies = 0
for item in csv_hashed:
hash = item[0]
if not hash:
print("No hash found: {}".format(item[1]))
sys.exit()
if count_of_hash != hash:
count = 1
count_of_hash = hash
dest_dir = os.path.join(LOCATION, DUPS_FOLDERNAME, hash)
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
new_filename = '{}{}'.format(count, chosen_hash_extensions[hash])
dest = os.path.join(dest_dir, new_filename)
print("\n{}\n-->\n{}\n".format(item[1], dest))
if not os.path.exists(dest):
shutil.copy(item[1], dest)
else:
print("Path exists: {}".format(dest))
print("{}/{}".format(count_copies, len(csv_hashed)))
count += 1
count_copies += 1
Prepare info for copying:
# 7_prepare_copy.py
import csv
import datetime
import os
import shutil
import sys
PROCESS_FOLDERNAME = 'Loops_Unsorted'
TYPE_FOLDERNAME = 'Type'
LOCATION = r'F:\Audio\processed'
if __name__ == "__main__":
#Load dup hashed files.
csv_dup_hashed = []
dup_csv_path = os.path.join(LOCATION, "{}.csv".format('Loops_Unsorted_multi_found'))
if os.path.exists(dup_csv_path):
csv_file = open(dup_csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
filename, file_extension = os.path.splitext(row['path'])
if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
csv_dup_hashed.append([row['hash'], row['path']])
csv_file.close()
else:
print("Not found: {}".format(dup_csv_path))
sys.exit()
# Get missing exception.
hash_path = {}
hash_extensions = {}
for item in csv_dup_hashed:
filename, file_extension = os.path.splitext(item[1])
#print(filename, file_extension)
hash_extensions.setdefault(item[0], []).append(file_extension.lower())
hash_path[item[0]] = item[1]
chosen_hash_extensions = {}
for hash in hash_extensions:
extentions = hash_extensions[hash]
extension = set(extentions)
if len(extension) > 1:
print("Too many extensions: {} {}".format(hash, ", ".join(extension)))
sys.exit()
chosen_hash_extensions[hash] =list(extension)[0]
#Load existing hashed files.
csv_hashed_to_copy = {}
hash_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
if os.path.exists(hash_csv_path):
csv_file = open(hash_csv_path, 'r', encoding="utf-8")
reader = csv.DictReader(csv_file, delimiter='^')
for row in reader:
filename, file_extension = os.path.splitext(row['path'])
if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
if row['hash'] in chosen_hash_extensions:
csv_hashed_to_copy[row['hash']] = hash_path[row['hash']]
else:
csv_hashed_to_copy[row['hash']] = row['path']
csv_file.close()
else:
print("File not found: {}".format())
types = {}
count_files = 0
for hash in csv_hashed_to_copy:
filename, file_extension = os.path.splitext(csv_hashed_to_copy[hash])
types.setdefault(file_extension.lower(), {})[hash] = csv_hashed_to_copy[hash]
count_files += 1
copy_these = []
for type in types:
print(type)
group = 1
count = 1
for hash in types[type]:
dest_folder = os.path.join(LOCATION, TYPE_FOLDERNAME, type[1:], "{:05d}".format(group))
#if not os.path.exists(dest_folder):
# os.makedirs(dest_folder)
dest_filename = "{:05d}{}".format(count, type)
src = types[type][hash]
dest = os.path.join(dest_folder, dest_filename)
#shutil.copy(src, dest)
copy_these.append([src, dest])
if count >= 1000:
group += 1
count = 1
else:
count += 1
# Save Grouped Hashes to CSV.
output_csv_path = os.path.join(LOCATION, "{}_copy_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
output_csv_fieldnames = ['src', 'dst']
writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)
header = {value: value for value in output_csv_fieldnames}
writer.writerow(header)
rows = []
for src, dst in copy_these:
row = {'src': src, 'dst': dst}
rows.append(row)
for row in rows:
writer.writerow(row)
output_csv_file.close()