Upload files to ""
This commit is contained in:
parent
666565411c
commit
845f51bd59
476
get_news_entry.py
Normal file
476
get_news_entry.py
Normal file
@ -0,0 +1,476 @@
|
|||||||
|
import requests
|
||||||
|
import image_utils
|
||||||
|
import os
|
||||||
|
import io
|
||||||
|
import logging
|
||||||
|
import cv2
|
||||||
|
import imageio
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import datetime
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
from pathlib import Path
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
RSS_URL = 'https://fresh.franv.site/i/?a=rss&user=fossilfranv&token=sdfggf456456465xcvxcvxvc&hours=168'
|
||||||
|
MASTODON_TOKEN = 'rgw-GuWK64KeYU4X1qujykDQPYeBoBNcC65dNW654_E'
|
||||||
|
MASTODON_HOST = 'https://mast.airdog.site'
|
||||||
|
|
||||||
|
search_terms = ["slashdot", "time", "bbc", "cbc", "francetvinfo", "lithub", "theguardian",
|
||||||
|
"vancouversun", "techrepublic", "ycombinator", "slashdot", "time", "spiegel",
|
||||||
|
"wired", "androidauthority"]
|
||||||
|
|
||||||
|
|
||||||
|
# Define the logo_path as it is going to be modified in the process_news function
|
||||||
|
logo_path = ""
|
||||||
|
# Define image_path the same way for convenience.
|
||||||
|
image_path = "./images/11.jpg"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def post_to_mastodon(source, title, description, link, image_url, hashtags):
|
||||||
|
global logo_path
|
||||||
|
image_path = "./images/11.jpg" #
|
||||||
|
load_images_path = "./images/"
|
||||||
|
|
||||||
|
media_ids = []
|
||||||
|
|
||||||
|
# Write image to folder for later retrieval
|
||||||
|
if image_url and is_valid_url(image_url):
|
||||||
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
|
||||||
|
img_data = requests.get(image_url, headers=headers, timeout=10).content
|
||||||
|
with open(image_path, 'wb') as handler:
|
||||||
|
handler.write(img_data)
|
||||||
|
|
||||||
|
## Add bottom band and logo to image
|
||||||
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
|
||||||
|
new_image = add_bottom_band_with_logo(image_path, 0.15, (220, 220, 220), logo_path)
|
||||||
|
if not new_image:
|
||||||
|
new_image = Image.open("./logos/news.jpg")
|
||||||
|
|
||||||
|
new_image.save(image_path)
|
||||||
|
else:
|
||||||
|
# If no image just replace 11.jpg with default image
|
||||||
|
temp_image = Image.open("./logos/news.jpg")
|
||||||
|
temp_image.save("./images/11.jpg")
|
||||||
|
|
||||||
|
|
||||||
|
IMG_FILES = [filename for filename in os.listdir(load_images_path) if os.path.isfile(os.path.join(load_images_path, filename))]
|
||||||
|
|
||||||
|
# Originally wanted to post many images but now only 1 image
|
||||||
|
for file in IMG_FILES:
|
||||||
|
files = {'file': open(os.path.join(load_images_path, file), 'rb')}
|
||||||
|
url = f"{MASTODON_HOST}/api/v1/media"
|
||||||
|
r = requests.post(url, files=files, headers={'Authorization': f'Bearer {MASTODON_TOKEN}'})
|
||||||
|
response_json = r.json()
|
||||||
|
if r.status_code == 200:
|
||||||
|
media_id = response_json['id']
|
||||||
|
media_ids.append(media_id)
|
||||||
|
else:
|
||||||
|
print('Mastodon server problem: could not post image')
|
||||||
|
|
||||||
|
# Compose status_text which, with images, is the only content posted by mastodon
|
||||||
|
if source and title and description and link:
|
||||||
|
status_text = source.upper() + "\n" + "\n" + title.upper() + "\n" + "\n" + " " + description + "\n" + "\n" + link + "\n" + "\n" + str(hashtags)
|
||||||
|
data = {
|
||||||
|
"status": status_text,
|
||||||
|
"media_ids[]": media_ids,
|
||||||
|
"description": description,
|
||||||
|
"link": link
|
||||||
|
}
|
||||||
|
|
||||||
|
# Post to mastodon
|
||||||
|
url = f"{MASTODON_HOST}/api/v1/statuses"
|
||||||
|
r = requests.post(url, data=data, headers={'Authorization': f'Bearer {MASTODON_TOKEN}'})
|
||||||
|
json_data = r.json()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def read_news():
|
||||||
|
# Make a request to the RSS feed URL
|
||||||
|
response = requests.get(RSS_URL)
|
||||||
|
|
||||||
|
# Check if the request was successful (status code 200)
|
||||||
|
if response.status_code == 200:
|
||||||
|
# Parse the XML content using ElementTree
|
||||||
|
root = ET.fromstring(response.content)
|
||||||
|
items = list(root.findall('.//item'))
|
||||||
|
return items
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_news(items):
|
||||||
|
|
||||||
|
# Initialize main counter for the loop
|
||||||
|
main_counter = 0
|
||||||
|
# Iterate over each item element in the XML
|
||||||
|
for i, item in enumerate(items):
|
||||||
|
# Use XPath to extract the desired information from each item
|
||||||
|
title_element = item.find('.//title')
|
||||||
|
title = title_element.text if title_element is not None else None
|
||||||
|
|
||||||
|
description_element = item.find('.//description')
|
||||||
|
description = description_element.text if description_element is not None else None
|
||||||
|
|
||||||
|
# Clean the description of non-printable characters
|
||||||
|
soup = BeautifulSoup(description, 'html.parser')
|
||||||
|
description = soup.get_text()[:200]
|
||||||
|
|
||||||
|
link_element = item.find('.//link')
|
||||||
|
link = link_element.text if link_element is not None else None
|
||||||
|
|
||||||
|
enclosure_element = item.find('.//enclosure')
|
||||||
|
enclosure = enclosure_element.get('url') if enclosure_element is not None else None
|
||||||
|
|
||||||
|
media_ids = []
|
||||||
|
|
||||||
|
date_element = item.find('.//pubDate')
|
||||||
|
date = date_element.text if date_element is not None else None
|
||||||
|
|
||||||
|
displaydate_element = item.find('.//displaydate')
|
||||||
|
displaydate = displaydate_element.text if displaydate_element is not None else None
|
||||||
|
|
||||||
|
# Create a newsInfo object with the extracted information
|
||||||
|
newsInfo = {
|
||||||
|
'title': title,
|
||||||
|
'description': description,
|
||||||
|
'link': link,
|
||||||
|
'enclosure': enclosure,
|
||||||
|
'media_ids': media_ids,
|
||||||
|
'date': date,
|
||||||
|
'displaydate': displaydate,
|
||||||
|
'image_url' : None,
|
||||||
|
'hashtags' : None
|
||||||
|
# Add more fields as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add line feeds to the post to make more legible
|
||||||
|
print("\n" * 2)
|
||||||
|
|
||||||
|
# Extract the source from the newsInfo.link URL
|
||||||
|
# The source is going to be needed in process_news
|
||||||
|
url = newsInfo['link']
|
||||||
|
|
||||||
|
parsed_url = urlparse(url)
|
||||||
|
found_term = None
|
||||||
|
source = None
|
||||||
|
|
||||||
|
# Search for term in url
|
||||||
|
term_index = 0
|
||||||
|
while term_index < len(search_terms) and not found_term:
|
||||||
|
term = search_terms[term_index]
|
||||||
|
if term in parsed_url.netloc.lower():
|
||||||
|
found_term = term
|
||||||
|
term_index += 1
|
||||||
|
|
||||||
|
if found_term is not None:
|
||||||
|
source = found_term
|
||||||
|
else: # Look in description
|
||||||
|
description = newsInfo['description'][:50].lower()
|
||||||
|
for term in search_terms:
|
||||||
|
if term in description:
|
||||||
|
found_term = term
|
||||||
|
source = found_term
|
||||||
|
|
||||||
|
# Get page content to process_news
|
||||||
|
try:
|
||||||
|
response = requests.get(newsInfo['link'], headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"}, timeout = 5)
|
||||||
|
print(response)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
page_content = response.text
|
||||||
|
|
||||||
|
if process_news(page_content, source, newsInfo):
|
||||||
|
if not newsInfo['image_url']:
|
||||||
|
newsInfo['image_url'] = None
|
||||||
|
post_to_mastodon(source, newsInfo['title'], newsInfo['description'], newsInfo['link'], newsInfo['image_url'], newsInfo['hashtags'])
|
||||||
|
except requests.Timeout:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(newsInfo)
|
||||||
|
# Delay posting so as not to overwhelm mastodon
|
||||||
|
if main_counter < 6:
|
||||||
|
if is_daytime():
|
||||||
|
time.sleep(5)
|
||||||
|
else:
|
||||||
|
time.sleep(30)
|
||||||
|
main_counter += 1
|
||||||
|
else:
|
||||||
|
main_counter = 0
|
||||||
|
if is_daytime():
|
||||||
|
time.sleep(300)
|
||||||
|
else:
|
||||||
|
time.sleep(600)
|
||||||
|
|
||||||
|
# return source, newsInfo
|
||||||
|
# input("Press Enter to continue...")
|
||||||
|
# return source, newsInfo
|
||||||
|
|
||||||
|
# Process the news according to source
|
||||||
|
def process_news(page_content, source, newsInfo):
|
||||||
|
global logo_path
|
||||||
|
if source == "androidauthority":
|
||||||
|
image_url, r_hastags = image_utils.extract_androidauthority_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "androidauthority.jpg"
|
||||||
|
# Perform actions specific to source1
|
||||||
|
# newsInfo['additional_field1'] = "Value for source1"
|
||||||
|
print(source)
|
||||||
|
# Modify other fields in newsInfo as needed
|
||||||
|
|
||||||
|
elif source == "bbc":
|
||||||
|
image_url, r_hastags = image_utils.extract_bbc_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "bbc.jpg"
|
||||||
|
|
||||||
|
|
||||||
|
elif source == "cbc":
|
||||||
|
image_url, r_hastags = image_utils.extract_cbc_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "cbc.jpg"
|
||||||
|
|
||||||
|
elif source == "francetvinfo":
|
||||||
|
image_url, r_hastags = image_utils.extract_francetvinfo_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "franceinfo.jpg"
|
||||||
|
|
||||||
|
elif source == "theguardian":
|
||||||
|
image_url, r_hastags = image_utils.extract_theguardian_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "theguardian.jpg"
|
||||||
|
|
||||||
|
elif source == "vancouversun":
|
||||||
|
image_url, r_hastags = image_utils.extract_vancouver_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "vancouversun.jpg"
|
||||||
|
|
||||||
|
elif source == "techrepublic":
|
||||||
|
# Perform actions specific to source3
|
||||||
|
image_url, r_hastags = image_utils.extract_techrepublic_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "techrepublic.jpg"
|
||||||
|
|
||||||
|
elif source == "time":
|
||||||
|
image_url, r_hastags = image_utils.extract_time_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "time.jpg"
|
||||||
|
|
||||||
|
elif source == "wired":
|
||||||
|
# Perform actions specific to source3
|
||||||
|
image_url, r_hastags = image_utils.extract_wired_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "wired.jpg"
|
||||||
|
|
||||||
|
elif source == "slashdot":
|
||||||
|
logo_path = "./logos/" + "slashdot.jpg"
|
||||||
|
|
||||||
|
# Not used anymore
|
||||||
|
elif source == "ycombinator":
|
||||||
|
# The link is in fact in the description
|
||||||
|
extract_ycombinator_url(newsInfo)
|
||||||
|
|
||||||
|
elif source == "lithub":
|
||||||
|
# The link is in fact in the description
|
||||||
|
image_url, r_hastags = image_utils.extract_lithub_image_url(page_content)
|
||||||
|
newsInfo['image_url'] = image_url
|
||||||
|
newsInfo['hashtags'] = r_hastags
|
||||||
|
logo_path = "./logos/" + "lithub.jpg"
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
''' # Handle the case when source is not any of the expected values
|
||||||
|
# Extract the correct link from the description field
|
||||||
|
description = newsInfo.get('description', '')
|
||||||
|
start_index = description.find('<a href="') + len('<a href="')
|
||||||
|
end_index = description.find('"', start_index)
|
||||||
|
correct_link = description[start_index:end_index]
|
||||||
|
print(correct_link)
|
||||||
|
# input("Press Enter to continue... Process news")
|
||||||
|
# Update the link field in newsInfo with the correct link
|
||||||
|
newsInfo['link'] = correct_link '''
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
return newsInfo
|
||||||
|
|
||||||
|
# Not used anymore
|
||||||
|
def extract_ycombinator_url(newsInfo):
|
||||||
|
|
||||||
|
description = newsInfo['description']
|
||||||
|
|
||||||
|
start_marker = '<a href="'
|
||||||
|
end_marker = '"'
|
||||||
|
|
||||||
|
start_index = description.find(start_marker)
|
||||||
|
end_index = description.find(end_marker, start_index + len(start_marker))
|
||||||
|
|
||||||
|
if start_index != -1 and end_index != -1:
|
||||||
|
url = description[start_index + len(start_marker):end_index].strip()
|
||||||
|
newsInfo['link'] = url
|
||||||
|
else:
|
||||||
|
print("URL not found in the description")
|
||||||
|
|
||||||
|
|
||||||
|
# Sometimes images have multiple layers, I only need 1
|
||||||
|
|
||||||
|
def check_layer_count(image_path):
|
||||||
|
identify_command = ["identify", "-format", "%n", image_path]
|
||||||
|
result = subprocess.run(identify_command, capture_output=True, text=True)
|
||||||
|
if result.returncode == 0:
|
||||||
|
try:
|
||||||
|
layer_count = int(result.stdout)
|
||||||
|
return layer_count
|
||||||
|
except ValueError:
|
||||||
|
return 0 # Unable to determine the layer count
|
||||||
|
else:
|
||||||
|
print("Failed to retrieve image information.")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def add_bottom_band_with_logo(image_path, band_height_ratio, band_color, logo_path):
|
||||||
|
|
||||||
|
if image_path:
|
||||||
|
|
||||||
|
layer_count = check_layer_count(image_path)
|
||||||
|
|
||||||
|
if layer_count > 1:
|
||||||
|
selected_layer = random.randint(0, layer_count - 1)
|
||||||
|
delete_range = f"0-{selected_layer-1},{selected_layer+1}-{layer_count-1}"
|
||||||
|
conversion_command = [
|
||||||
|
"convert",
|
||||||
|
image_path,
|
||||||
|
f"-delete {delete_range}",
|
||||||
|
"-strip",
|
||||||
|
"-colorspace",
|
||||||
|
"sRGB",
|
||||||
|
"./images/temp.jpg"
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
# Single layer, proceed with the conversion
|
||||||
|
conversion_command = [
|
||||||
|
"convert",
|
||||||
|
image_path,
|
||||||
|
"-strip",
|
||||||
|
"-colorspace",
|
||||||
|
"sRGB",
|
||||||
|
"./images/temp.jpg"
|
||||||
|
]
|
||||||
|
|
||||||
|
result = subprocess.run(conversion_command)
|
||||||
|
if result.returncode == 0:
|
||||||
|
shutil.move("./images/temp.jpg", image_path)
|
||||||
|
image_cv2 = cv2.imread(image_path)
|
||||||
|
else:
|
||||||
|
result = subprocess.run(conversion_command, stderr=subprocess.PIPE)
|
||||||
|
error_output = result.stderr.decode("utf-8")
|
||||||
|
print("Error Output:", error_output)
|
||||||
|
image_cv2 = cv2.imread("./logos/news.jpg")
|
||||||
|
|
||||||
|
# Convert BGR to RGB
|
||||||
|
image_rgb = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2RGB)
|
||||||
|
|
||||||
|
# Convert the ndarray to a Pillow Image object
|
||||||
|
pil_image = Image.fromarray(image_rgb)
|
||||||
|
image = pil_image
|
||||||
|
|
||||||
|
band_height = int(image.height * band_height_ratio)
|
||||||
|
logo = Image.open(logo_path).convert("RGBA")
|
||||||
|
|
||||||
|
if logo_path:
|
||||||
|
# Calculate the desired width and height of the logo based on the band height and the aspect ratio of the logo image
|
||||||
|
logo_ratio = logo.width / logo.height
|
||||||
|
logo_height = int(band_height * 0.8) # Adjust the logo height as desired
|
||||||
|
logo_width = int(logo_height * logo_ratio)
|
||||||
|
|
||||||
|
# Tf these values are 0 it indicates a bad image
|
||||||
|
if not logo_width or not logo_height:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Resize the logo image to the calculated dimensions
|
||||||
|
logo = logo.resize((logo_width, logo_height))
|
||||||
|
|
||||||
|
# Create a new blank image with the required dimensions
|
||||||
|
new_image = Image.new(image.mode, (image.width, image.height + band_height), band_color)
|
||||||
|
|
||||||
|
# Paste the original image onto the new image, leaving the band at the bottom
|
||||||
|
new_image.paste(image, (0, 0))
|
||||||
|
|
||||||
|
# Calculate the position to paste the logo in the center of the band
|
||||||
|
logo_position = ((new_image.width - logo.width) // 2, image.height + int((band_height - logo_height) / 2))
|
||||||
|
|
||||||
|
# Paste the logo image onto the new image in the calculated position
|
||||||
|
new_image.paste(logo, logo_position, mask=logo)
|
||||||
|
return new_image
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Sometimes image_url is not valid
|
||||||
|
def is_valid_url(url):
|
||||||
|
if not url.startswith("https://"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = urlparse(url)
|
||||||
|
if result.scheme and result.netloc:
|
||||||
|
response = requests.get(url, headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"}, timeout = 5)
|
||||||
|
if response.status_code == 200:
|
||||||
|
content_type = response.headers.get("Content-Type")
|
||||||
|
if content_type and "image" in content_type:
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_daytime():
|
||||||
|
current_time = datetime.datetime.now().time()
|
||||||
|
start_time = datetime.time(7, 0)
|
||||||
|
end_time = datetime.time(22, 0)
|
||||||
|
|
||||||
|
if start_time <= current_time <= end_time:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
# Infinite loop reading the RSS feed and repeating the request to refresh
|
||||||
|
my_loop = 0
|
||||||
|
while True:
|
||||||
|
my_loop +=1
|
||||||
|
items = read_news()
|
||||||
|
print("my loop is now at" + " " + str(my_loop))
|
||||||
|
if items:
|
||||||
|
# source, newsInfo = get_news(items)
|
||||||
|
get_news(items)
|
||||||
|
if is_daytime():
|
||||||
|
time.sleep(900)
|
||||||
|
else:
|
||||||
|
time.sleep(1800)
|
||||||
|
else:
|
||||||
|
time.sleep(900)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue
Block a user