v3 - fix render all html content as well, use chunks and multithreading

This commit is contained in:
friedemann.blume 2024-07-20 10:17:52 +02:00
parent ab3bf98fa7
commit 5959534447
2 changed files with 73 additions and 9 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -8,13 +8,18 @@ from markdownify import markdownify
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from tqdm import tqdm from tqdm import tqdm
from tempfile import mkdtemp
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configuration # Configuration
input_dir = '/mnt/input' input_dir = '/mnt/input'
output_dir = '/mnt/output' output_dir = '/mnt/output'
chunk_size = 100 # Number of emails per chunk
max_workers = 4 # Number of threads
# Setup logging # Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Ensure output directory exists # Ensure output directory exists
@ -38,7 +43,9 @@ def format_date(date_str):
def extract_email_content(email): def extract_email_content(email):
"""Extract the email content, prioritizing text/plain over text/html.""" """Extract the email content, prioritizing text/plain over text/html."""
logger.debug("Extracting email content")
if email.is_multipart(): if email.is_multipart():
logger.debug("Email is multipart")
for part in email.walk(): for part in email.walk():
content_type = part.get_content_type() content_type = part.get_content_type()
disposition = str(part.get('Content-Disposition')) disposition = str(part.get('Content-Disposition'))
@ -47,6 +54,7 @@ def extract_email_content(email):
elif content_type == 'text/html' and 'attachment' not in disposition: elif content_type == 'text/html' and 'attachment' not in disposition:
return markdownify(part.get_payload(decode=True).decode(errors='ignore')) return markdownify(part.get_payload(decode=True).decode(errors='ignore'))
else: else:
logger.debug("Email is not multipart")
content_type = email.get_content_type() content_type = email.get_content_type()
if content_type == 'text/plain': if content_type == 'text/plain':
return email.get_payload(decode=True).decode(errors='ignore') return email.get_payload(decode=True).decode(errors='ignore')
@ -85,28 +93,84 @@ def save_email_as_markdown(email, index, output_subdir):
except Exception as e: except Exception as e:
logger.error(f"Error processing email {index + 1}: {e}") logger.error(f"Error processing email {index + 1}: {e}")
def split_mbox(mbox_file):
"""Split the mbox file into smaller chunks."""
logger.debug("Splitting mbox file")
base_name = os.path.basename(mbox_file)
subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension
temp_dir = mkdtemp()
mbox = mailbox.mbox(mbox_file)
chunks = []
chunk_index = 0
email_index = 0
current_chunk = None
chunk_path = None
try:
for email in mbox:
if email_index % chunk_size == 0:
if current_chunk is not None:
current_chunk.close()
chunk_path = os.path.join(temp_dir, f"{subdir_name}_chunk_{chunk_index}.mbox")
current_chunk = mailbox.mbox(chunk_path, create=True)
chunks.append(chunk_path)
chunk_index += 1
current_chunk.add(email)
email_index += 1
if current_chunk is not None:
current_chunk.close()
except Exception as e:
logger.error(f"Error splitting mbox file: {e}")
shutil.rmtree(temp_dir)
return []
logger.debug(f"Created {len(chunks)} chunks")
return chunks
def convert_mbox_to_markdown(mbox_file): def convert_mbox_to_markdown(mbox_file):
try: try:
# Create a subdirectory in the output directory with the name of the .mbox file logger.debug("Converting mbox to markdown")
base_name = os.path.basename(mbox_file) chunks = split_mbox(mbox_file)
if not chunks:
logger.error(f"Failed to split mbox file: {mbox_file}")
return
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing chunks"):
chunk = futures[future]
try:
future.result()
logger.info(f"Completed processing chunk: {chunk}")
except Exception as e:
logger.error(f"Error processing chunk {chunk}: {e}")
except Exception as e:
logger.error(f"Error processing mbox file {mbox_file}: {e}")
def process_chunk(chunk_file):
try:
logger.debug("Processing chunk")
base_name = os.path.basename(chunk_file)
subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension
output_subdir = os.path.join(output_dir, subdir_name) output_subdir = os.path.join(output_dir, subdir_name)
os.makedirs(output_subdir, exist_ok=True) os.makedirs(output_subdir, exist_ok=True)
logger.info(f"Processing .mbox file: {mbox_file}") logger.info(f"Processing chunk file: {chunk_file}")
mbox = mailbox.mbox(mbox_file) mbox = mailbox.mbox(chunk_file)
# Show progress bar # Show progress bar
total_emails = len(mbox) total_emails = len(mbox)
logger.info(f"Total emails to process: {total_emails}") logger.info(f"Total emails to process in chunk: {total_emails}")
for i, email in tqdm(enumerate(mbox), total=total_emails, desc='Converting emails'): for i, email in enumerate(mbox):
logger.info(f"Processing email {i + 1}/{total_emails}") logger.info(f"Processing email {i + 1}/{total_emails}")
save_email_as_markdown(email, i, output_subdir) save_email_as_markdown(email, i, output_subdir)
logger.info(f"Completed processing {mbox_file}") logger.info(f"Completed processing chunk file: {chunk_file}")
except Exception as e: except Exception as e:
logger.error(f"Error processing mbox file {mbox_file}: {e}") logger.error(f"Error processing chunk file {chunk_file}: {e}")
class MboxFileHandler(FileSystemEventHandler): class MboxFileHandler(FileSystemEventHandler):
def on_created(self, event): def on_created(self, event):