main module
main module
This commit is contained in:
parent
6cf20f90cb
commit
f191a1b7f5
418
get_news_entry.py
Normal file
418
get_news_entry.py
Normal file
@ -0,0 +1,418 @@
|
||||
import requests
|
||||
import image_utils
|
||||
import os
|
||||
import io
|
||||
import logging
|
||||
import cv2
|
||||
import imageio
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
from urllib.parse import urlparse
|
||||
from pathlib import Path
|
||||
from bs4 import BeautifulSoup
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
|
||||
RSS_URL = 'https://fresh.franv.site/i/?a=rss&user=fossilfranv&token=sdfggf456456465xcvxcvxvc&hours=168'
|
||||
MASTODON_TOKEN = 'J65EiYQMpc-hY3CaUJaQPHdXxV7-KiKZjlr0QPESlVQ'
|
||||
MASTODON_HOST = 'https://mast.airdog.site'
|
||||
|
||||
search_terms = ["slashdot", "time", "bbc", "cbc", "francetvinfo", "lithub", "theguardian",
|
||||
"vancouversun", "techrepublic", "ycombinator", "slashdot", "time", "spiegel",
|
||||
"wired", "androidauthority"]
|
||||
|
||||
|
||||
# Define the logo_path as it is going to be modified in the process_news function
|
||||
logo_path = ""
|
||||
# Define image_path the same way for convenience.
|
||||
image_path = "/home/franv/mast_bot/images/11.jpg"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def post_to_mastodon(source, title, description, link, image_url, hashtags):
|
||||
global logo_path
|
||||
image_path = "/home/franv/mast_bot/images/11.jpg" #
|
||||
load_images_path = "/home/franv/mast_bot/images/"
|
||||
|
||||
media_ids = []
|
||||
|
||||
# Write image to folder for later retrieval
|
||||
if image_url and is_valid_url(image_url):
|
||||
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
|
||||
img_data = requests.get(image_url, headers=headers, timeout=10).content
|
||||
with open(image_path, 'wb') as handler:
|
||||
handler.write(img_data)
|
||||
|
||||
## Add bottom band and logo to image
|
||||
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
|
||||
new_image = add_bottom_band_with_logo(image_path, 0.15, (220, 220, 220), logo_path)
|
||||
if not new_image:
|
||||
new_image = Image.open("/home/franv/mast_bot/logos/news.jpg")
|
||||
|
||||
new_image.save(image_path)
|
||||
else:
|
||||
# If no image just replace 11.jpg with default image
|
||||
temp_image = Image.open("/home/franv/mast_bot/logos/news.jpg")
|
||||
temp_image.save("/home/franv/mast_bot/images/11.jpg")
|
||||
|
||||
|
||||
IMG_FILES = [filename for filename in os.listdir(load_images_path) if os.path.isfile(os.path.join(load_images_path, filename))]
|
||||
|
||||
# Originally wanted to post many images but now only 1 image
|
||||
for file in IMG_FILES:
|
||||
files = {'file': open(os.path.join(load_images_path, file), 'rb')}
|
||||
url = f"{MASTODON_HOST}/api/v1/media"
|
||||
r = requests.post(url, files=files, headers={'Authorization': f'Bearer {MASTODON_TOKEN}'})
|
||||
response_json = r.json()
|
||||
if r.status_code == 200:
|
||||
media_id = response_json['id']
|
||||
media_ids.append(media_id)
|
||||
|
||||
# Compose status_text which, with images, is the only content posted by mastodon
|
||||
if source and title and description and link:
|
||||
status_text = source.upper() + "\n" + "\n" + title.upper() + "\n" + "\n" + " " + description + "\n" + "\n" + link + "\n" + "\n" + str(hashtags)
|
||||
data = {
|
||||
"status": status_text,
|
||||
"media_ids[]": media_ids,
|
||||
"description": description,
|
||||
"link": link
|
||||
}
|
||||
|
||||
# Post to mastodon
|
||||
url = f"{MASTODON_HOST}/api/v1/statuses"
|
||||
r = requests.post(url, data=data, headers={'Authorization': f'Bearer {MASTODON_TOKEN}'})
|
||||
json_data = r.json()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def read_news():
|
||||
# Make a request to the RSS feed URL
|
||||
response = requests.get(RSS_URL)
|
||||
|
||||
# Check if the request was successful (status code 200)
|
||||
if response.status_code == 200:
|
||||
# Parse the XML content using ElementTree
|
||||
root = ET.fromstring(response.content)
|
||||
items = list(root.findall('.//item'))
|
||||
return items
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def get_news(items):
|
||||
|
||||
# Initialize main counter for the loop
|
||||
main_counter = 0
|
||||
# Iterate over each item element in the XML
|
||||
for i, item in enumerate(items):
|
||||
# Use XPath to extract the desired information from each item
|
||||
title_element = item.find('.//title')
|
||||
title = title_element.text if title_element is not None else None
|
||||
|
||||
description_element = item.find('.//description')
|
||||
description = description_element.text if description_element is not None else None
|
||||
|
||||
# Clean the description of non-printable characters
|
||||
soup = BeautifulSoup(description, 'html.parser')
|
||||
description = soup.get_text()[:250]
|
||||
|
||||
link_element = item.find('.//link')
|
||||
link = link_element.text if link_element is not None else None
|
||||
|
||||
enclosure_element = item.find('.//enclosure')
|
||||
enclosure = enclosure_element.get('url') if enclosure_element is not None else None
|
||||
|
||||
media_ids = []
|
||||
|
||||
date_element = item.find('.//pubDate')
|
||||
date = date_element.text if date_element is not None else None
|
||||
|
||||
displaydate_element = item.find('.//displaydate')
|
||||
displaydate = displaydate_element.text if displaydate_element is not None else None
|
||||
|
||||
# Create a newsInfo object with the extracted information
|
||||
newsInfo = {
|
||||
'title': title,
|
||||
'description': description,
|
||||
'link': link,
|
||||
'enclosure': enclosure,
|
||||
'media_ids': media_ids,
|
||||
'date': date,
|
||||
'displaydate': displaydate,
|
||||
'image_url' : None,
|
||||
'hashtags' : None
|
||||
# Add more fields as needed
|
||||
}
|
||||
|
||||
# Add line feeds to the post to make more legible
|
||||
print("\n" * 2)
|
||||
|
||||
# Extract the source from the newsInfo.link URL
|
||||
# The source is going to be needed in process_news
|
||||
url = newsInfo['link']
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
found_term = None
|
||||
source = None
|
||||
|
||||
# Search for term in url
|
||||
term_index = 0
|
||||
while term_index < len(search_terms) and not found_term:
|
||||
term = search_terms[term_index]
|
||||
if term in parsed_url.netloc.lower():
|
||||
found_term = term
|
||||
term_index += 1
|
||||
|
||||
if found_term is not None:
|
||||
source = found_term
|
||||
else: # Look in description
|
||||
description = newsInfo['description'][:50].lower()
|
||||
for term in search_terms:
|
||||
if term in description:
|
||||
found_term = term
|
||||
source = found_term
|
||||
|
||||
# Get page content to process_news
|
||||
try:
|
||||
response = requests.get(newsInfo['link'], headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"}, timeout = 5)
|
||||
print(response)
|
||||
|
||||
if response.status_code == 200:
|
||||
page_content = response.text
|
||||
|
||||
if process_news(page_content, source, newsInfo):
|
||||
if not newsInfo['image_url']:
|
||||
newsInfo['image_url'] = None
|
||||
post_to_mastodon(source, newsInfo['title'], newsInfo['description'], newsInfo['link'], newsInfo['image_url'], newsInfo['hashtags'])
|
||||
except requests.Timeout:
|
||||
continue
|
||||
|
||||
print(newsInfo)
|
||||
# Delay posting so as not to overwhelm mastodon
|
||||
if main_counter < 6:
|
||||
time.sleep(30)
|
||||
main_counter += 1
|
||||
else:
|
||||
main_counter = 0
|
||||
time.sleep(300)
|
||||
|
||||
# return source, newsInfo
|
||||
# input("Press Enter to continue...")
|
||||
# return source, newsInfo
|
||||
|
||||
# Process the news according to source
|
||||
def process_news(page_content, source, newsInfo):
|
||||
global logo_path
|
||||
if source == "androidauthority":
|
||||
image_url, r_hastags = image_utils.extract_androidauthority_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "androidauthority.jpg"
|
||||
# Perform actions specific to source1
|
||||
# newsInfo['additional_field1'] = "Value for source1"
|
||||
print(source)
|
||||
# Modify other fields in newsInfo as needed
|
||||
|
||||
elif source == "bbc":
|
||||
image_url, r_hastags = image_utils.extract_bbc_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "bbc.jpg"
|
||||
|
||||
|
||||
elif source == "cbc":
|
||||
image_url, r_hastags = image_utils.extract_cbc_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "cbc.jpg"
|
||||
|
||||
elif source == "francetvinfo":
|
||||
image_url, r_hastags = image_utils.extract_francetvinfo_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "franceinfo.jpg"
|
||||
|
||||
elif source == "theguardian":
|
||||
image_url, r_hastags = image_utils.extract_theguardian_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "theguardian.jpg"
|
||||
|
||||
elif source == "vancouversun":
|
||||
image_url, r_hastags = image_utils.extract_vancouver_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "vancouversun.jpg"
|
||||
|
||||
elif source == "techrepublic":
|
||||
# Perform actions specific to source3
|
||||
newsInfo['image_url'] = image_utils.extract_techrepublic_image_url(page_content)
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "techrepublic.jpg"
|
||||
|
||||
elif source == "time":
|
||||
image_url, r_hastags = image_utils.extract_time_image_url(page_content)
|
||||
newsInfo['image_url'] = image_url
|
||||
newsInfo['hashtags'] = r_hastags
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "time.jpg"
|
||||
|
||||
elif source == "wired":
|
||||
# Perform actions specific to source3
|
||||
newsInfo['image_url'] = image_utils.extract_wired_image_url(page_content)
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "wired.jpg"
|
||||
|
||||
elif source == "slashdot":
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "slashdot.jpg"
|
||||
|
||||
# Not used anymore
|
||||
elif source == "ycombinator":
|
||||
# The link is in fact in the description
|
||||
extract_ycombinator_url(newsInfo)
|
||||
|
||||
elif source == "lithub":
|
||||
# The link is in fact in the description
|
||||
newsInfo['image_url'] = image_utils.extract_lithub_image_url(page_content)
|
||||
logo_path = "/home/franv/mast_bot/logos/" + "lithub.jpg"
|
||||
print("Lithub image_url:", newsInfo['image_url'])
|
||||
|
||||
|
||||
else:
|
||||
''' # Handle the case when source is not any of the expected values
|
||||
# Extract the correct link from the description field
|
||||
description = newsInfo.get('description', '')
|
||||
start_index = description.find('<a href="') + len('<a href="')
|
||||
end_index = description.find('"', start_index)
|
||||
correct_link = description[start_index:end_index]
|
||||
print(correct_link)
|
||||
# input("Press Enter to continue... Process news")
|
||||
# Update the link field in newsInfo with the correct link
|
||||
newsInfo['link'] = correct_link '''
|
||||
return None
|
||||
|
||||
|
||||
return newsInfo
|
||||
|
||||
# Not used anymore
|
||||
def extract_ycombinator_url(newsInfo):
|
||||
|
||||
description = newsInfo['description']
|
||||
|
||||
start_marker = '<a href="'
|
||||
end_marker = '"'
|
||||
|
||||
start_index = description.find(start_marker)
|
||||
end_index = description.find(end_marker, start_index + len(start_marker))
|
||||
|
||||
if start_index != -1 and end_index != -1:
|
||||
url = description[start_index + len(start_marker):end_index].strip()
|
||||
newsInfo['link'] = url
|
||||
else:
|
||||
print("URL not found in the description")
|
||||
|
||||
|
||||
def add_bottom_band_with_logo(image_path, band_height_ratio, band_color, logo_path):
|
||||
|
||||
if image_path:
|
||||
|
||||
image_cv2 = cv2.imread(image_path)
|
||||
if image_cv2 is None:
|
||||
conversion_command = [
|
||||
"convert",
|
||||
image_path,
|
||||
"-strip",
|
||||
"-colorspace",
|
||||
"sRGB",
|
||||
"/home/franv/mast_bot/images/temp.jpg"
|
||||
]
|
||||
result = subprocess.run(conversion_command)
|
||||
if result.returncode == 0:
|
||||
shutil.move("/home/franv/mast_bot/images/temp.jpg", image_path)
|
||||
image_cv2 = cv2.imread(image_path)
|
||||
else:
|
||||
result = subprocess.run(conversion_command, stderr=subprocess.PIPE)
|
||||
error_output = result.stderr.decode("utf-8")
|
||||
print("Error Output:", error_output)
|
||||
image_cv2 = cv2.imread("/home/franv/mast_bot/logos/news.jpg")
|
||||
|
||||
# Convert BGR to RGB
|
||||
image_rgb = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Convert the ndarray to a Pillow Image object
|
||||
pil_image = Image.fromarray(image_rgb)
|
||||
image = pil_image
|
||||
|
||||
band_height = int(image.height * band_height_ratio)
|
||||
logo = Image.open(logo_path).convert("RGBA")
|
||||
|
||||
if logo_path:
|
||||
# Calculate the desired width and height of the logo based on the band height and the aspect ratio of the logo image
|
||||
logo_ratio = logo.width / logo.height
|
||||
logo_height = int(band_height * 0.8) # Adjust the logo height as desired
|
||||
logo_width = int(logo_height * logo_ratio)
|
||||
|
||||
# Tf these values are 0 it indicates a bad image
|
||||
if not logo_width or not logo_height:
|
||||
return None
|
||||
|
||||
# Resize the logo image to the calculated dimensions
|
||||
logo = logo.resize((logo_width, logo_height))
|
||||
|
||||
# Create a new blank image with the required dimensions
|
||||
new_image = Image.new(image.mode, (image.width, image.height + band_height), band_color)
|
||||
|
||||
# Paste the original image onto the new image, leaving the band at the bottom
|
||||
new_image.paste(image, (0, 0))
|
||||
|
||||
# Calculate the position to paste the logo in the center of the band
|
||||
logo_position = ((new_image.width - logo.width) // 2, image.height + int((band_height - logo_height) / 2))
|
||||
|
||||
# Paste the logo image onto the new image in the calculated position
|
||||
new_image.paste(logo, logo_position, mask=logo)
|
||||
return new_image
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
# Sometimes image_url is not valid
|
||||
def is_valid_url(url):
|
||||
if not url.startswith("https://"):
|
||||
return False
|
||||
|
||||
try:
|
||||
result = urlparse(url)
|
||||
if result.scheme and result.netloc:
|
||||
response = requests.get(url, headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"}, timeout = 5)
|
||||
if response.status_code == 200:
|
||||
content_type = response.headers.get("Content-Type")
|
||||
if content_type and "image" in content_type:
|
||||
return True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def main():
|
||||
|
||||
# Infinite loop reading the RSS feed and repeating the request to refresh
|
||||
my_loop = 0
|
||||
while True:
|
||||
my_loop +=1
|
||||
items = read_news()
|
||||
print("my loop is now at" + " " + str(my_loop))
|
||||
if items:
|
||||
# source, newsInfo = get_news(items)
|
||||
get_news(items)
|
||||
time.sleep(900)
|
||||
else:
|
||||
time.sleep(900)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user