mbox-to-markdown/mbox_to_markdown.py

115 lines
4.0 KiB
Python
Raw Normal View History

2024-07-19 10:44:15 +00:00
import mailbox
import os
2024-07-19 11:03:58 +00:00
import re
import logging
from datetime import datetime
from email.utils import parsedate_tz, mktime_tz
2024-07-19 10:44:15 +00:00
from markdownify import markdownify
2024-07-19 11:03:58 +00:00
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from tqdm import tqdm
2024-07-19 10:44:15 +00:00
# Configuration
2024-07-19 11:03:58 +00:00
input_dir = '/mnt/input'
output_dir = '/mnt/output'
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
2024-07-19 10:44:15 +00:00
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
2024-07-19 11:03:58 +00:00
def sanitize_filename(filename):
"""Sanitize the filename to remove invalid characters."""
return re.sub(r'[<>:"/\\|?*\x00-\x1F]', '', filename)
def format_date(date_str):
"""Format the date string to be suitable for filenames."""
try:
parsed_date = parsedate_tz(date_str)
if parsed_date is not None:
timestamp = mktime_tz(parsed_date)
formatted_date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d_%H-%M-%S')
return formatted_date
except Exception as e:
logger.error(f"Error formatting date: {e}")
return 'NoDate'
def save_email_as_markdown(email, index, output_subdir):
2024-07-19 10:44:15 +00:00
subject = email.get('subject', 'No Subject')
date = email.get('date', 'No Date')
2024-07-19 11:03:58 +00:00
sender = email.get('from', 'Unknown Sender')
recipients = email.get('to', 'Unknown Recipient')
# Sanitize and format the filename
sanitized_subject = sanitize_filename(subject)
sanitized_sender = sanitize_filename(sender.split('<')[0].strip())
sanitized_recipients = sanitize_filename(recipients.split(',')[0].strip())
formatted_date = format_date(date)
filename = f"{sanitized_subject} - {sanitized_sender} - {sanitized_recipients} - {formatted_date}.md"
filename = os.path.join(output_subdir, filename)
# Handle potential None payload
payload = email.get_payload(decode=True)
if payload is None:
body_markdown = "No content available"
else:
try:
body = payload.decode(errors='ignore')
body_markdown = markdownify(body)
except (UnicodeDecodeError, AttributeError) as e:
body_markdown = f"Error decoding content: {e}"
2024-07-19 10:44:15 +00:00
# Create a Markdown file for each email
with open(filename, 'w', encoding='utf-8') as file:
file.write(f'# {subject}\n')
2024-07-19 11:03:58 +00:00
file.write(f'*Date: {date}*\n')
file.write(f'*From: {sender}*\n')
file.write(f'*To: {recipients}*\n\n')
2024-07-19 10:44:15 +00:00
file.write(body_markdown)
2024-07-19 11:03:58 +00:00
logger.info(f"Saved email {index + 1} as Markdown: {filename}")
2024-07-19 10:44:15 +00:00
def convert_mbox_to_markdown(mbox_file):
2024-07-19 11:03:58 +00:00
# Create a subdirectory in the output directory with the name of the .mbox file
base_name = os.path.basename(mbox_file)
subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension
output_subdir = os.path.join(output_dir, subdir_name)
os.makedirs(output_subdir, exist_ok=True)
logger.info(f"Processing .mbox file: {mbox_file}")
2024-07-19 10:44:15 +00:00
mbox = mailbox.mbox(mbox_file)
2024-07-19 11:03:58 +00:00
# Show progress bar
total_emails = len(mbox)
logger.info(f"Total emails to process: {total_emails}")
for i, email in tqdm(enumerate(mbox), total=total_emails, desc='Converting emails'):
save_email_as_markdown(email, i, output_subdir)
class MboxFileHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
if event.src_path.endswith('.mbox'):
logger.info(f"New .mbox file detected: {event.src_path}")
convert_mbox_to_markdown(event.src_path)
def start_watching():
event_handler = MboxFileHandler()
observer = Observer()
observer.schedule(event_handler, path=input_dir, recursive=False)
observer.start()
logger.info(f"Watching for new .mbox files in {input_dir}...")
try:
while True:
pass # Keep the script running
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
start_watching()
2024-07-19 10:44:15 +00:00