import mailbox import os import re import logging from datetime import datetime from email.utils import parsedate_tz, mktime_tz from markdownify import markdownify from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from tqdm import tqdm from tempfile import mkdtemp import shutil from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration input_dir = '/mnt/input' output_dir = '/mnt/output' chunk_size = 100 # Number of emails per chunk max_workers = 4 # Number of threads # Setup logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) def sanitize_filename(filename): """Sanitize the filename to remove invalid characters.""" return re.sub(r'[<>:"/\\|?*\x00-\x1F]', '', filename) def format_date(date_str): """Format the date string to be suitable for filenames.""" try: parsed_date = parsedate_tz(date_str) if parsed_date is not None: timestamp = mktime_tz(parsed_date) formatted_date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d_%H-%M-%S') return formatted_date except Exception as e: logger.error(f"Error formatting date: {e}") return 'NoDate' def extract_email_content(email): """Extract the email content, prioritizing text/plain over text/html.""" logger.debug("Extracting email content") if email.is_multipart(): logger.debug("Email is multipart") for part in email.walk(): content_type = part.get_content_type() disposition = str(part.get('Content-Disposition')) if content_type == 'text/plain' and 'attachment' not in disposition: return part.get_payload(decode=True).decode(errors='ignore') elif content_type == 'text/html' and 'attachment' not in disposition: return markdownify(part.get_payload(decode=True).decode(errors='ignore')) else: logger.debug("Email is not multipart") content_type = email.get_content_type() if content_type == 'text/plain': return email.get_payload(decode=True).decode(errors='ignore') elif content_type == 'text/html': return markdownify(email.get_payload(decode=True).decode(errors='ignore')) return "No content available" def save_email_as_markdown(email, index, output_subdir): logger.info(f"Starting to process email {index + 1}") try: subject = email.get('subject', 'No Subject') date = email.get('date', 'No Date') sender = email.get('from', 'Unknown Sender') recipients = email.get('to', 'Unknown Recipient') # Sanitize and format the filename sanitized_subject = sanitize_filename(subject) sanitized_sender = sanitize_filename(sender.split('<')[0].strip()) sanitized_recipients = sanitize_filename(recipients.split(',')[0].strip()) formatted_date = format_date(date) filename = f"{sanitized_subject} - {sanitized_sender} - {sanitized_recipients} - {formatted_date}.md" filename = os.path.join(output_subdir, filename) # Extract email content body_markdown = extract_email_content(email) # Create a Markdown file for each email with open(filename, 'w', encoding='utf-8') as file: file.write(f'# {subject}\n') file.write(f'*Date: {date}*\n') file.write(f'*From: {sender}*\n') file.write(f'*To: {recipients}*\n\n') file.write(body_markdown) logger.info(f"Saved email {index + 1} as Markdown: {filename}") except Exception as e: logger.error(f"Error processing email {index + 1}: {e}") def split_mbox(mbox_file): """Split the mbox file into smaller chunks.""" logger.debug("Splitting mbox file") base_name = os.path.basename(mbox_file) subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension temp_dir = mkdtemp() mbox = mailbox.mbox(mbox_file) chunks = [] chunk_index = 0 email_index = 0 current_chunk = None chunk_path = None try: for email in mbox: if email_index % chunk_size == 0: if current_chunk is not None: current_chunk.close() chunk_path = os.path.join(temp_dir, f"{subdir_name}_chunk_{chunk_index}.mbox") current_chunk = mailbox.mbox(chunk_path, create=True) chunks.append(chunk_path) chunk_index += 1 current_chunk.add(email) email_index += 1 if current_chunk is not None: current_chunk.close() except Exception as e: logger.error(f"Error splitting mbox file: {e}") shutil.rmtree(temp_dir) return [] logger.debug(f"Created {len(chunks)} chunks") return chunks def convert_mbox_to_markdown(mbox_file): try: logger.debug("Converting mbox to markdown") chunks = split_mbox(mbox_file) if not chunks: logger.error(f"Failed to split mbox file: {mbox_file}") return with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks} for future in tqdm(as_completed(futures), total=len(futures), desc="Processing chunks"): chunk = futures[future] try: future.result() logger.info(f"Completed processing chunk: {chunk}") except Exception as e: logger.error(f"Error processing chunk {chunk}: {e}") except Exception as e: logger.error(f"Error processing mbox file {mbox_file}: {e}") def process_chunk(chunk_file): try: logger.debug("Processing chunk") base_name = os.path.basename(chunk_file) subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension output_subdir = os.path.join(output_dir, subdir_name) os.makedirs(output_subdir, exist_ok=True) logger.info(f"Processing chunk file: {chunk_file}") mbox = mailbox.mbox(chunk_file) # Show progress bar total_emails = len(mbox) logger.info(f"Total emails to process in chunk: {total_emails}") for i, email in enumerate(mbox): logger.info(f"Processing email {i + 1}/{total_emails}") save_email_as_markdown(email, i, output_subdir) logger.info(f"Completed processing chunk file: {chunk_file}") except Exception as e: logger.error(f"Error processing chunk file {chunk_file}: {e}") class MboxFileHandler(FileSystemEventHandler): def on_created(self, event): if event.is_directory: return if event.src_path.endswith('.mbox'): logger.info(f"New .mbox file detected: {event.src_path}") convert_mbox_to_markdown(event.src_path) def start_watching(): event_handler = MboxFileHandler() observer = Observer() observer.schedule(event_handler, path=input_dir, recursive=False) observer.start() logger.info(f"Watching for new .mbox files in {input_dir}...") try: while True: pass # Keep the script running except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": start_watching()