import os import time import feedparser import logging import requests import threading import smtplib from bs4 import BeautifulSoup from io import BytesIO from mastodon import Mastodon from atproto import Client from dotenv import load_dotenv from http.server import HTTPServer, BaseHTTPRequestHandler from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from dateutil import parser as date_parser from datetime import datetime, timezone, timedelta load_dotenv() FEED_URL = os.getenv("FEED_URL") SEEN_POSTS_FILE = "/data/seen_posts.txt" MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL") MASTODON_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN") BSKY_HANDLE = os.getenv("BSKY_IDENTIFIER") BSKY_PASSWORD = os.getenv("BSKY_PASSWORD") MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0)) POST_TARGETS = os.getenv("POST_TARGETS", "both").lower() logger = logging.getLogger() logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/health": self.send_response(200) self.end_headers() self.wfile.write(b"OK") else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass def start_health_server(): server = HTTPServer(("0.0.0.0", 8000), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() logger.info(f"💡 Healthcheck server running on port 8000.") def should_send_email(on_success: bool): mode = os.getenv("EMAIL_MODE", "errors").lower() return (mode == "all") or (mode == "errors" and not on_success) def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str: color = "#2e7d32" if status == "success" else "#d32f2f" bg_color = "#f5f5f5" if status == "success" else "#fff3f3" border_color = "#ccc" if status == "success" else "#e57373" emoji = "✅" if status == "success" else "❌" heading = "Post Published" if status == "success" else "Error Posting Entry" meta = "This is an automated success notification." if status == "success" else "Please check logs or configuration." error_html = f"""

Error:

{error_message}
""" if error_message else "" return f"""

{emoji} {heading}

Title:
{title}

Link:
{link}

{error_html}

{meta}

""" def send_status_email(subject, html_content): try: smtp_host = os.getenv("SMTP_HOST") smtp_port = int(os.getenv("SMTP_PORT", 587)) smtp_user = os.getenv("SMTP_USER") smtp_password = os.getenv("SMTP_PASSWORD") email_from = os.getenv("EMAIL_FROM") email_to = os.getenv("EMAIL_TO") msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = email_from msg["To"] = email_to msg.attach(MIMEText(html_content, "html")) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_password) server.sendmail(email_from, email_to, msg.as_string()) logger.info(f"✅ Status email sent successfully.") except Exception as e: logger.error(f"❌ Error sending email: {e}") def load_seen_ids(): os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True) if not os.path.exists(SEEN_POSTS_FILE): open(SEEN_POSTS_FILE, "w").close() with open(SEEN_POSTS_FILE, "r") as f: return set(line.strip() for line in f) def save_seen_id(post_id): with open(SEEN_POSTS_FILE, "a") as f: f.write(post_id + "\n") def post_to_mastodon(title, link, tags): mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" mastodon.toot(message) def fetch_og_data(url): try: resp = requests.get(url, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") og_title = soup.find("meta", property="og:title") og_image = soup.find("meta", property="og:image") title = og_title["content"] if og_title and og_title.has_attr("content") else None image_url = og_image["content"] if og_image and og_image.has_attr("content") else None return title, image_url except Exception as e: logger.error(f"❌ Error fetching OG data: {e}") return None, None def post_to_bluesky(title, link, tags): client = Client() client.login(BSKY_HANDLE, BSKY_PASSWORD) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" # Try rich embed try: og_title, image_url = fetch_og_data(link) if og_title and image_url: embed = { "$type": "app.bsky.embed.external", "external": { "uri": link, "title": title, "description": "", "thumb": { "$type": "blob", "ref": None, "mimeType": "", "size": 0 } } } img_resp = requests.get(image_url, timeout=10) img_resp.raise_for_status() blob = client.upload_blob(BytesIO(img_resp.content)) embed["external"]["thumb"] = blob.blob client.send_post(text=message, embed=embed) logger.info(f"✅ Posted to Bluesky with preview.") return except Exception as e: logger.error(f"❌ Error uploading preview to Bluesky: {e}") # Fallback to text-only post client.send_post(text=message) logger.info(f"💡 Posted to Bluesky without preview.") def extract_post_date(entry): date_fields = [ entry.get("published"), entry.get("updated"), entry.get("date_published"), entry.get("date_modified"), entry.get("pubDate") ] dates = [] for d in date_fields: if d: try: dt = date_parser.parse(d) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dates.append(dt) except Exception as e: logger.warning(f"⚠️ Could not parse date: {d} ({e})") return min(dates) if dates else datetime.now(timezone.utc) def main(): seen_ids = load_seen_ids() feed = feedparser.parse(FEED_URL) now = datetime.now(timezone.utc) max_age = timedelta(days=MAX_POST_AGE_DAYS) for entry in feed.entries: post_id = entry.get("id") or entry.get("link") if post_id in seen_ids: continue post_date = extract_post_date(entry) age = now - post_date age_days = age.days age_hours = age.seconds // 3600 #logger.info(f"Post '{entry.get('title', '').strip()}' is {age_days} days and {age_hours} hours old.") if post_date < now - max_age: logger.info(f"⏩ Skipping old post ({MAX_POST_AGE_DAYS}+ days): {post_id}") continue title = entry.get("title", "").strip() link = entry.get("link", "").strip() tags = [] if "tags" in entry: tags = [tag["term"] for tag in entry.tags if "term" in tag] if tags: hashtags = " ".join(f"#{tag}" for tag in tags) message = f"{link} {hashtags}" else: message = link logger.info(f"💡 New post found: {title}") try: if POST_TARGETS in ("mastodon", "both"): post_to_mastodon(title, link, tags) time.sleep(2) if POST_TARGETS in ("bluesky", "both"): post_to_bluesky(title, link, tags) save_seen_id(post_id) logger.info(f"✅ Post successfully published.") if should_send_email(on_success=True): send_status_email( f"✅ Post published: {title}", generate_email_html("success", title, link) ) except Exception as e: logger.error(f"❌ Posting failed: {e}") if should_send_email(on_success=False): send_status_email( f"❌ Error posting: {title}", generate_email_html("error", title, link, str(e)) ) time.sleep(5) if __name__ == "__main__": INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30)) logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.") start_health_server() while True: try: main() except Exception as e: logger.error(f"Unhandled error during execution: {e}") logger.info(f"⏳ Waiting {INTERVAL_MINUTES} minutes until next run...") time.sleep(INTERVAL_MINUTES * 60)