Files
BlueMastoFeed/bluemastofeed.py

339 lines
13 KiB
Python

import os
import time
import feedparser
import logging
import requests
import threading
import smtplib
import re
import unicodedata
from bs4 import BeautifulSoup
from io import BytesIO
from mastodon import Mastodon
from atproto import Client
from dotenv import load_dotenv
from http.server import HTTPServer, BaseHTTPRequestHandler
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dateutil import parser as date_parser
from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
# Configuration
FEED_URL = os.getenv("FEED_URL")
SEEN_POSTS_FILE = "/data/seen_posts.txt"
MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL")
MASTODON_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN")
BSKY_HANDLE = os.getenv("BSKY_IDENTIFIER")
BSKY_PASSWORD = os.getenv("BSKY_PASSWORD")
MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0))
POST_TARGETS = os.getenv("POST_TARGETS", "both").lower()
# Logger setup
logger = logging.getLogger()
log_level = os.getenv("LOG_LEVEL", "INFO").upper() # Enable DEBUG level via env variable
logger.setLevel(getattr(logging, log_level, logging.INFO))
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Configuration overview (INFO level)
logger.info(f"📡 Feed URL: {FEED_URL}")
logger.info(f"📤 Posting targets: {POST_TARGETS}")
logger.info(f"🕒 Max post age: {MAX_POST_AGE_DAYS} days")
logger.info(f"📨 Email mode: {os.getenv('EMAIL_MODE', 'errors')}")
logger.debug(f"🛠 Full environment variables: {dict(os.environ)}") # DEBUG: All environment variables
# Healthcheck server handler
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
logger.debug("🏥 Healthcheck requested and responded OK.") # DEBUG Healthcheck log
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
# Suppress default HTTP request logging
pass
def start_health_server():
server = HTTPServer(("0.0.0.0", 8000), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info(f"✨ Healthcheck server running on port 8000.")
# Email helper functions
def should_send_email(on_success: bool):
mode = os.getenv("EMAIL_MODE", "errors").lower()
return (mode == "all") or (mode == "errors" and not on_success)
def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str:
color = "#2e7d32" if status == "success" else "#d32f2f"
bg_color = "#f5f5f5" if status == "success" else "#fff3f3"
border_color = "#ccc" if status == "success" else "#e57373"
emoji = "" if status == "success" else ""
heading = "Post Published" if status == "success" else "Error Posting Entry"
meta = "This is an automated success notification." if status == "success" else "Please check logs or configuration."
error_html = f"""
<p><strong>Error Details:</strong></p>
<div class=\"error\">{error_message}</div>
""" if error_message else ""
return f"""
<html>
<head>
<style>
body {{ font-family: 'Courier New', monospace; background-color: {bg_color}; color: #333; padding: 20px; }}
.container {{ background-color: #ffffff; border: 1px solid {border_color}; border-radius: 8px; padding: 20px; max-width: 600px; margin: auto; }}
h2 {{ color: {color}; }}
a {{ color: #1a73e8; text-decoration: none; }}
.error {{ font-family: monospace; background-color: #fce4ec; padding: 10px; border-radius: 4px; color: #b71c1c; }}
.meta {{ font-size: 14px; color: #777; }}
</style>
</head>
<body>
<div class=\"container\">
<h2>{emoji} {heading}</h2>
<p><strong>Title:</strong><br>{title}</p>
<p><strong>Link:</strong><br><a href=\"{link}\">{link}</a></p>
{error_html}
<p class=\"meta\">{meta}</p>
</div>
</body>
</html>
"""
def send_status_email(subject, html_content):
try:
smtp_host = os.getenv("SMTP_HOST")
smtp_port = int(os.getenv("SMTP_PORT", 587))
smtp_user = os.getenv("SMTP_USER")
smtp_password = os.getenv("SMTP_PASSWORD")
email_from = os.getenv("EMAIL_FROM")
email_to = os.getenv("EMAIL_TO")
logger.debug(f"📧 Preparing to send email to {email_to} with subject: {subject}") # DEBUG
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = email_from
msg["To"] = email_to
msg.attach(MIMEText(html_content, "html"))
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.sendmail(email_from, email_to, msg.as_string())
logger.info(f"✅ Status email sent successfully.")
except Exception:
logger.exception(f"❌ Error sending email:") # Full stack trace on error
# Utility functions
def extract_facets_utf8(text: str):
facets = []
def get_byte_range(char_start, char_end):
byte_start = len(text[:char_start].encode("utf-8"))
byte_end = len(text[:char_end].encode("utf-8"))
return byte_start, byte_end
# Extract hashtags
for match in re.finditer(r"#(\w+)", text):
tag = match.group(1)
byte_start, byte_end = get_byte_range(*match.span())
facets.append({
"index": {"byteStart": byte_start, "byteEnd": byte_end},
"features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}]
})
# Extract links
for match in re.finditer(r"https?://[^\s]+", text):
url = match.group(0)
byte_start, byte_end = get_byte_range(*match.span())
facets.append({
"index": {"byteStart": byte_start, "byteEnd": byte_end},
"features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}]
})
logger.debug(f"🏷 Extracted facets: {facets}") # DEBUG
return facets
def load_seen_ids():
os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True)
if not os.path.exists(SEEN_POSTS_FILE):
open(SEEN_POSTS_FILE, "w").close()
with open(SEEN_POSTS_FILE, "r") as f:
seen = set(line.strip() for line in f)
logger.debug(f"🗂 Loaded {len(seen)} seen post IDs.") # DEBUG
return seen
def save_seen_id(post_id):
with open(SEEN_POSTS_FILE, "a") as f:
f.write(post_id + "\n")
logger.debug(f"📝 Saved post ID as seen: {post_id}") # DEBUG
def post_to_mastodon(title, link, tags):
mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL)
hashtags = " ".join(f"#{tag}" for tag in tags) if tags else ""
message = f"{title}\n\n{link}"
if hashtags:
message += f"\n\n{hashtags}"
logger.debug(f"🐘 Posting to Mastodon: {message}") # DEBUG
mastodon.toot(message)
logger.info("✅ Posted to Mastodon.")
def fetch_og_data(url):
try:
logger.debug(f"🔍 Fetching OpenGraph data from {url}") # DEBUG
resp = requests.get(url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
og_title = soup.find("meta", property="og:title")
og_image = soup.find("meta", property="og:image")
title = og_title["content"] if og_title and og_title.has_attr("content") else None
image_url = og_image["content"] if og_image and og_image.has_attr("content") else None
if not title or not image_url:
logger.debug(f"⚠️ OpenGraph data incomplete. Title: {title}, Image: {image_url}") # DEBUG
return title, image_url
except Exception:
logger.exception(f"❌ Error fetching OpenGraph data:")
return None, None
def post_to_bluesky(title, link, tags):
client = Client()
logger.debug(f"🔑 Logging in to Bluesky as {BSKY_HANDLE}") # DEBUG
client.login(BSKY_HANDLE, BSKY_PASSWORD)
hashtags = " ".join(f"#{tag}" for tag in tags) if tags else ""
message = f"{title}\n\n{link}"
if hashtags:
message += f"\n\n{hashtags}"
facets = extract_facets_utf8(message)
try:
og_title, image_url = fetch_og_data(link)
if og_title and image_url:
embed = {
"$type": "app.bsky.embed.external",
"external": {
"uri": link,
"title": title,
"description": "",
"thumb": {"$type": "blob", "ref": None, "mimeType": "", "size": 0}
}
}
logger.debug(f"📸 Attempting to upload preview image from: {image_url}") # DEBUG
img_resp = requests.get(image_url, timeout=10)
img_resp.raise_for_status()
blob = client.upload_blob(BytesIO(img_resp.content))
embed["external"]["thumb"] = blob.blob
client.send_post(text=message, embed=embed, facets=facets)
logger.info(f"✅ Posted to Bluesky with preview.")
return
except Exception:
logger.exception(f"❌ Error uploading preview to Bluesky:")
client.send_post(text=message, facets=facets)
logger.info(f"💡 Posted to Bluesky without preview.")
def extract_post_date(entry):
date_fields = [entry.get(k) for k in ("published", "updated", "date_published", "date_modified", "pubDate")]
dates = []
for d in date_fields:
if d:
try:
dt = date_parser.parse(d)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
dates.append(dt)
except Exception as e:
logger.warning(f"⚠️ Could not parse date: {d} ({e})")
chosen_date = min(dates) if dates else datetime.now(timezone.utc)
logger.debug(f"📅 Extracted post date: {chosen_date}") # DEBUG
return chosen_date
def main():
seen_ids = load_seen_ids()
feed = feedparser.parse(FEED_URL)
now = datetime.now(timezone.utc)
max_age = timedelta(days=MAX_POST_AGE_DAYS)
logger.debug(f"📰 Number of feed entries found: {len(feed.entries)}") # DEBUG
for entry in feed.entries:
post_id = entry.get("id") or entry.get("link")
logger.debug(f"🆔 Checking post ID: {post_id}") # DEBUG
if post_id in seen_ids:
logger.debug(f"⏭️ Post already processed: {post_id}") # DEBUG
continue
post_date = extract_post_date(entry)
if post_date < now - max_age:
logger.info(f"⏩ Skipping old post (older than {MAX_POST_AGE_DAYS} days): {post_id}")
continue
title = entry.get("title", "").strip()
link = entry.get("link", "").strip()
def sanitize_tag(tag):
tag = tag.lower()
tag = unicodedata.normalize("NFKD", tag).encode("ascii", "ignore").decode("ascii")
tag = re.sub(r"\W+", "", tag)
return tag
tags = []
if "tags" in entry:
raw_tags = [tag.get("term") if isinstance(tag, dict) else getattr(tag, "term", None) for tag in entry.tags]
tags = [sanitize_tag(t) for t in raw_tags if t]
logger.debug(f"🏷 Extracted tags: {tags}") # DEBUG
logger.info(f"💡 New post found: {title}")
try:
if POST_TARGETS in ("mastodon", "both"):
post_to_mastodon(title, link, tags)
time.sleep(2)
if POST_TARGETS in ("bluesky", "both"):
post_to_bluesky(title, link, tags)
save_seen_id(post_id)
logger.info(f"✅ Post successfully published.")
if should_send_email(on_success=True):
send_status_email(f"✅ Post published: {title}", generate_email_html("success", title, link))
except Exception:
logger.exception(f"❌ Posting failed for post: {post_id}")
if should_send_email(on_success=False):
send_status_email(f"❌ Error posting: {title}", generate_email_html("error", title, link, str(Exception)))
time.sleep(5)
if __name__ == "__main__":
INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30))
logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.")
start_health_server()
while True:
try:
main()
except Exception:
logger.exception("Unhandled error during execution:")
logger.info(f"⏳ Waiting {INTERVAL_MINUTES} minutes until next run...")
time.sleep(INTERVAL_MINUTES * 60)