CSS : Various of modifications and several fixes;

Python : Improve caching system;
Python : Support XEP-0292: vCard4 Over XMPP (Thank you. Marvin W);
SVG    : Add new icons, including characters as temporary placeholders (Thank you. Tigase);
TOML   : Add more systems and modify properties of clients;
XHTML  : Improve design and add a new page of extended vcard.
This commit is contained in:
Schimon Jehudah, Adv. 2024-10-30 22:04:05 +02:00
parent 8675f295e6
commit 373b7b1f05
24 changed files with 2980 additions and 372 deletions

688
fasi.py
View file

@ -3,6 +3,7 @@
from asyncio import TimeoutError
from datetime import datetime
from dateutil import parser
from email.utils import parseaddr
from fastapi import FastAPI, Form, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
@ -11,7 +12,6 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import glob
#import logging
#from os import mkdir
#from os.path import getsize, exists
import os
import qrcode
@ -215,9 +215,118 @@ class HttpInstance:
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
# NOTE Was /b/
@self.app.get('/c/{jid}')
async def c_jid_get(request: Request, jid):
"""Display entries of a vCard4"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
#try:
if True:
entries = []
exception = jid_vcard = note = node_items = node_note = \
number_of_pages = page_number = previous = selection = \
title = None
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
filename = directory + item_id_vcard4 + '.xml'
if os.path.exists(filename) and os.path.getsize(filename) > 0:
jid_details = Data.open_file_xml(filename)
else:
await FileUtilities.cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
xml_data = Data.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
#vcard_info = Syndication.extract_vcard_items(child_element)
vcard_info = Syndication.extract_vcard4_items(child_element)
# Action and instance type
action = 'Profile'
# Query URI links
print('Query URI links')
jid_kind = 'account'
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name_vcard4)
# Graphic files
#filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
#except Exception as e:
else:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = \
node_title = number_of_pages = page_number = previous = \
selection = url = None
if 'fn' in vcard_info and vcard_info['fn']:
title = vcard_info['fn']
elif 'alias' in vcard_info and vcard_info['alias']:
title = vcard_info['alias']
else:
title = jid_bare.split('@')[0]
if 'alias' in vcard_info and vcard_info['alias']:
alias = vcard_info['alias']
else:
alias = jid_bare.split('@')[0]
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'vcard.xhtml'
template_dict = {
'action' : action,
'alias' : alias,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'entries' : entries,
'exception' : exception,
#'filename' : filename,
'jid_bare' : jid,
'jid_note' : note,
#'jid_title' : title,
#'node_title' : node_title,
'node_name' : node_name_vcard4,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'title' : title,
'url' : request.url._url,
'vcard_info' : vcard_info,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/b/{jid}')
async def b_jid_get(request: Request, jid):
response = await browse_jid_node_get(request, jid, 'urn:xmpp:microblog:0')
return response
# TODO Change to /p/ for pubsub
@self.app.get('/d/{jid}/{node_name}')
@self.app.get('/d/{jid}/{node_name}/{item_id}')
async def d_jid_node_get(request: Request, jid, node_name, item_id=None):
response = await browse_jid_node_get(request, jid, node_name, item_id=None)
return response
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
"""Browse items of a pubsub node"""
jid_path = urlsplit(jid).path
@ -232,9 +341,8 @@ class HttpInstance:
#try:
if True:
entries = []
exception = jid_vcard = note = node_note = number_of_pages = \
page_number = previous = selection = None
exception = jid_vcard = note = node_items = node_note = \
number_of_pages = page_number = previous = selection = None
filename = 'details/{}.toml'.format(jid_bare)
if os.path.exists(filename) and os.path.getsize(filename) > 0:
@ -243,30 +351,20 @@ class HttpInstance:
jid_details = await FileUtilities.cache_jid_data(
jabber_id, password, jid_bare, node_name, item_id)
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# Node item IDs
nodes = jid_details['nodes']
items = jid_details['items']
if node_name not in nodes:
nodes[node_name] = {}
node_item_ids = await XmppXep0060.get_node_item_ids(
xmpp_instance, jid_bare, node_name)
#node_item_ids = await XmppUtilities.get_item_ids_of_node(
# jabber_id, password, jid_bare, node_name, nodes)
if isinstance(node_item_ids['iq'], stanza.iq.Iq):
iq = node_item_ids['iq']
iq_disco_items_items = iq['disco_items']['items']
for item in items:
if item[1] == node_name:
nodes[node_name]['title'] = item[2]
break
nodes[node_name]['count'] = len(iq_disco_items_items)
nodes[node_name]['item_ids'] = []
for item in iq_disco_items_items:
nodes[node_name]['item_ids'].append(
[item[0] or '', item[1] or '', item[2] or ''])
#items = jid_details['items']
# for item in items:
# if item[1] == node_name:
# nodes[node_name]['title'] = item[2]
# break
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
if not os.path.exists(directory):
os.mkdir(directory)
await FileUtilities.cache_node_data(
jabber_id, password, jid_bare, node_name)
count = jid_details['count']
jid_info = {
@ -279,7 +377,8 @@ class HttpInstance:
'note' : jid_details['note'],
'type' : jid_details['image_type']}
messages = jid_details['messages']
node_title = nodes[node_name]['title']
#node_title = nodes[node_name]['title'] if 'title' in nodes[node_name] else jid_details['name']
node_title = node_name
note = jid_details['note']
#title = nodes[node_name]['title'] if node_name else jid_details['name']
title = jid_details['name']
@ -290,50 +389,36 @@ class HttpInstance:
#xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
# Node items
if item_id:
previous = True
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name, item_ids=[item_id])
else:
item_ids = []
for item in nodes[node_name]['item_ids']:
item_ids.append(item[2])
# NOTE Consider to neglect the reversal of order, because, then, items can be found at the same page.
item_ids.reverse()
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
else:
entries = []
node_items = os.listdir(directory)
if 'urn:xmpp:avatar:metadata.xml' in node_items:
node_items.remove('urn:xmpp:avatar:metadata.xml')
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
item_ids_10 = item_ids[ix:][:10]
number_of_pages = int(len(item_ids) / 10)
if number_of_pages < len(item_ids) / 10: number_of_pages += 1
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name, item_ids=item_ids_10)
if isinstance(node_items['iq'], stanza.iq.Iq):
#title = title or node_name
if not node_title: node_title = node_name
#node_note = nodes[node_name]['title'] if node_name else jid_details['name']
#node_note = xmpp_uri # jid_bare
iq = node_items['iq']
for item in iq['pubsub']['items']:
item_payload = item['payload']
entry = Syndication.extract_items(item_payload)
if entry: entry['id'] = item['id']
entries.append(entry)
#if len(entries) > 10: break
else:
message = '{}: {} (XEP-0060)'.format(node_items['condition'], node_items['text'])
if entries: entries.reverse()
xmpp_instance.disconnect()
ix = 0
page_number = 1
item_ids_10 = node_items[ix:][:10]
number_of_pages = int(len(node_items) / 10)
if number_of_pages < len(node_items) / 10: number_of_pages += 1
if node_items:
for item in item_ids_10:
filename = directory + item
xml_data = Data.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
entry = Syndication.extract_atom_items(child_element)
if entry:
filename_without_file_extension = item[:len(item)-4]
entry['id'] = filename_without_file_extension
entries.append(entry)
#if len(entries) > 10: break
if jid_kind:
# Action and instance type
@ -540,9 +625,9 @@ class HttpInstance:
#try:
if True:
action = count = exception = instance = jid_vcard = \
jid_info = link_href = message = note = selection = title = \
view_href = xmpp_uri = None
action = alias = count_item = count_message = exception = \
instance = jid_vcard = jid_info = link_href = message = note = \
selection = title = vcard4 = view_href = xmpp_uri = None
#node_name = 'urn:xmpp:microblog:0'
filename = 'details/{}.toml'.format(jid_bare)
@ -555,19 +640,13 @@ class HttpInstance:
# Set node name to 'urn:xmpp:microblog:0'
jid_kind = jid_details['kind']
nodes = jid_details['nodes']
count_message = jid_details['messages']
if (jid_kind not in ('conference', 'mix', 'muc') and
'@' in jid_bare and
not node_name and
'urn:xmpp:microblog:0' in nodes):
node_name = 'urn:xmpp:microblog:0'
if ('@' in jid_bare and
'urn:xmpp:microblog:0' not in nodes and
jid_kind not in ('conference', 'mix', 'muc')):
count = 0
else:
count = nodes[node_name]['count'] if node_name in nodes else jid_details['count']
items = jid_details['items']
jid_info = {
'error' : jid_details['error'],
@ -581,20 +660,49 @@ class HttpInstance:
#note = nodes[node_name]['title'] if node_name in nodes else jid_details['note']
note = jid_details['note']
# TODO Append results to file
# vCard4
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
filename = directory + item_id_vcard4 + '.xml'
if os.path.exists(filename) and os.path.getsize(filename) > 0:
xml_data = Data.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
#vcard_info = Syndication.extract_vcard_items(child_element)
vcard_info = Syndication.extract_vcard4_items(child_element)
title = vcard_info['fn']
alias = vcard_info['alias']
note = vcard_info['note']
else:
await FileUtilities.cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
if os.path.exists(filename) and os.path.getsize(filename) > 0:
vcard4 = True
# Node item IDs
if node_name not in nodes:
nodes[node_name] = await XmppUtilities.get_item_ids_of_node(
jabber_id, password, jid_bare, node_name, nodes)
if isinstance(nodes[node_name]['iq'], stanza.iq.Iq):
iq = nodes[node_name]['iq']
iq_disco_items = iq['disco_items']
if iq_disco_items['items']:
count = len(nodes[node_name]['iq']['disco_items']['items'])
else:
count = 0
else:
count = 0
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
if not os.path.exists(directory):
os.mkdir(directory)
await FileUtilities.cache_node_data(
jabber_id, password, jid_bare, node_name)
# Node items
entries = []
node_items = os.listdir(directory)
if 'urn:xmpp:avatar:metadata.xml' in node_items:
node_items.remove('urn:xmpp:avatar:metadata.xml')
count_item = len(node_items)
# if ('@' in jid_bare and
# 'urn:xmpp:microblog:0' not in nodes and
# jid_kind not in ('conference', 'mix', 'muc')):
# count_item = 0
# else:
# count_item = len(node_items)
if jid_kind == 'pubsub' and node_name:
items = jid_details['items']
@ -616,7 +724,7 @@ class HttpInstance:
else: # jid_info['error']
action = 'Contact'
instance = view_href = ''
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
if jid_info['condition']: message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
xmpp_uri = jid_bare
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
@ -635,8 +743,12 @@ class HttpInstance:
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = jid
count = filename = jid_bare = jid_vcard = jid_kind = links = \
message = selection = url = None
alias = count_item = count_message = filename = jid_bare = \
jid_vcard = jid_kind = links = message = selection = url = \
vcard4 = None
note_500 = note[:500]
note = note_500 + '' if note_500 < note else note_500
# NOTE Handling of variables "title" and "note" in case of '/j/{jid}/{node_name}' is confusing.
# TODO Add new keys that are of 'node' and be utilized for nodes, instead of reusing a variable for several roles.
@ -645,10 +757,12 @@ class HttpInstance:
template_file = 'jid.xhtml'
template_dict = {
'action' : action,
'alias' : alias,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'count' : count,
'count_item' : count_item,
'count_message' : count_message,
'instance' : instance,
'exception' : exception,
'filename' : filename,
@ -656,11 +770,13 @@ class HttpInstance:
'jid_kind' : jid_kind,
'links' : links,
'message' : message,
'news_client' : news_client,
'note' : note, # TODO node_note or title of PubSub JID
'request' : request,
'selection' : selection,
'title' : title, # TODO node_title
'url' : request.url._url,
'vcard4' : vcard4,
'view_href' : view_href,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
@ -710,6 +826,8 @@ class HttpInstance:
user_agent = request.headers.get("user-agent")
user_agent_lower = user_agent.lower()
match user_agent_lower:
case _ if 'bsd' in user_agent_lower:
software = 'bsd'
case _ if 'linux' in user_agent_lower:
software = 'linux'
case _ if 'haiku' in user_agent_lower:
@ -722,14 +840,18 @@ class HttpInstance:
software = 'apple'
name = software.title()
if software == 'bsd': name = 'BSD'
if software == 'posix': name = 'POSIX'
if software == 'ubports': name = 'UBports'
if name.endswith('os'): name = name.replace('os', 'OS')
filename_clients = 'clients.toml'
clients = Data.open_file_toml(filename_clients)
client_selection = []
clients_software = 0
for client in clients:
if software in clients[client]:
clients_software += 1
if featured and 'featured' not in clients[client]['properties']:
skipped = True
continue
@ -742,12 +864,15 @@ class HttpInstance:
'resources' : clients[client]['resources'] if 'resources' in clients[client] else ''}
client_selection.append(client_selected)
skipped = False if len(client_selection) == clients_software else True
template_file = 'download.xhtml'
template_dict = {
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'client_selection' : client_selection,
'featured' : featured,
'skipped' : skipped,
'request' : request,
'software' : software,
@ -824,9 +949,81 @@ class Data:
data_as_string = tomli_w.dumps(data)
fn.write(data_as_string)
def open_file_xml(filename: str) -> ET.ElementTree:
data = ET.parse(filename)
return data
def save_to_file(filename: str, data: str) -> None:
with open(filename, 'w') as fn:
fn.write(data)
class FileUtilities:
async def cache_jid_data(jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None):
async def cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4):
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
vcard4_data = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
xmpp_instance.disconnect()
if vcard4_data:
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
iq = vcard4_data['iq']
for item in iq['pubsub']['items']:
filename = directory + item_id_vcard4 + '.xml'
xml_item_as_string = str(item)
Data.save_to_file(filename, xml_item_as_string)
#item_payload = item['payload']
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
async def cache_node_data(
jabber_id, password, jid_bare, node_name):
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name)
xmpp_instance.disconnect()
if node_items:
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(node_items['iq'], stanza.iq.Iq):
iq = node_items['iq']
namespace = '{http://www.w3.org/2005/Atom}'
for item in iq['pubsub']['items']:
item_payload = item['payload']
date_element = item_payload.find(namespace + 'updated')
if not date_element: date_element = item_payload.find(namespace + 'published')
if isinstance(date_element, ET.Element):
date = date_element.text
modification_time = parser.parse(date).timestamp()
filename = directory + item['id'] + '.xml'
xml_item_as_string = str(item)
Data.save_to_file(filename, xml_item_as_string)
if isinstance(date_element, ET.Element):
file_statistics = os.stat(filename)
access_time = file_statistics.st_atime
os.utime(filename, (access_time, modification_time))
#item_payload = item['payload']
#entry = Syndication.extract_atom_items(item_payload)
async def cache_jid_data(
jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None):
iq_disco_items_list = iq_disco_items_items_list = node_note = node_title = title = ''
jid_vcard = {
@ -849,6 +1046,28 @@ class FileUtilities:
jid_info_iq = jid_info['iq']
jid_kind = jid_info['kind']
# Set node name to 'urn:xmpp:microblog:0' if JID is an account
if jid_kind == 'account' and not node_name: node_name = 'urn:xmpp:microblog:0'
# vCard4 data
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
vcard4_data = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
if vcard4_data:
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
iq = vcard4_data['iq']
for item in iq['pubsub']['items']:
filename = directory + item_id_vcard4 + '.xml'
xml_item_as_string = str(item)
Data.save_to_file(filename, xml_item_as_string)
#item_payload = item['payload']
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
# JID info
print('JID info')
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
@ -976,6 +1195,38 @@ class FileUtilities:
nodes[node_name]['item_ids'].append(
[item_id[0] or '', item_id[1] or '', item_id[2] or ''])
item_ids = []
for item in nodes[node_name]['item_ids']:
item_ids.append(item[2])
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name)
if node_items:
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(node_items['iq'], stanza.iq.Iq):
iq = node_items['iq']
namespace = '{http://www.w3.org/2005/Atom}'
for item in iq['pubsub']['items']:
item_payload = item['payload']
date_element = item_payload.find(namespace + 'updated')
if not date_element: date_element = item_payload.find(namespace + 'published')
if isinstance(date_element, ET.Element):
date = date_element.text
modification_time = parser.parse(date).timestamp()
filename = directory + item['id'] + '.xml'
xml_item_as_string = str(item)
Data.save_to_file(filename, xml_item_as_string)
if isinstance(date_element, ET.Element):
file_statistics = os.stat(filename)
access_time = file_statistics.st_atime
os.utime(filename, (access_time, modification_time))
#item_payload = item['payload']
#entry = Syndication.extract_atom_items(item_payload)
xmpp_instance.disconnect()
# Notes
@ -1175,36 +1426,202 @@ class Graphics:
class Syndication:
def extract_items(item_payload, limit=False):
# def extract_vcard_items(xml_data):
# namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
# title = xml_data.find(namespace + 'title')
#
# entry = {'fn' : content_text,
# 'note' : link_href,
# 'email' : published_text,
# 'impp' : summary_text,
# 'url' : tags}
# return entry
def extract_vcard_items(xml_data):
"""Extracts all items from a vCard XML ElementTree.
Args:
xml_data (ElementTree): The vCard XML as an ElementTree object.
Returns:
dict: A dictionary where keys are item names and values are their text content.
"""
items = {}
for item in xml_data.iter():
# Skip the root element (vcard)
if item.tag == '{urn:ietf:params:xml:ns:vcard-4.0}vcard':
continue
# Extract item name and text content
item_name = item.tag.split('}')[1]
# Check for any direct text content or child elements
item_text = []
if item.text:
item_text.append(item.text)
for child in item:
if child.text:
item_text.append(child.text)
# Join text elements if multiple found
if item_text:
items[item_name] = ' '.join(item_text).strip() # Strip extra spaces
else:
items[item_name] = None
return items
def extract_vcard4_items(xml_data):
namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
vcard = {}
element_em = xml_data.find(namespace + 'email')
element_fn = xml_data.find(namespace + 'fn')
element_nn = xml_data.find(namespace + 'nickname')
element_nt = xml_data.find(namespace + 'note')
element_og = xml_data.find(namespace + 'org')
element_im = xml_data.find(namespace + 'impp')
element_ul = xml_data.find(namespace + 'url')
if isinstance(element_em, ET.Element):
for i in element_em:
text = i.text
if text:
email = text
break
else:
email = ''
else:
email = ''
if isinstance(element_fn, ET.Element):
for i in element_fn:
text = i.text
if text:
title = text
break
else:
title = ''
else:
title = ''
if isinstance(element_nn, ET.Element):
for i in element_nn:
text = i.text
if text:
alias = text
break
else:
alias = ''
else:
alias = ''
if isinstance(element_nt, ET.Element):
for i in element_nt:
text = i.text
if text:
note = text
break
else:
note = ''
else:
note = ''
if isinstance(element_og, ET.Element):
for i in element_og:
text = i.text
if text:
org = text
break
else:
org = ''
else:
org = ''
if isinstance(element_im, ET.Element):
for i in element_im:
text = i.text
if text:
impp = text
break
else:
impp = ''
else:
impp = ''
if isinstance(element_ul, ET.Element):
for i in element_ul:
text = i.text
if text:
url = text
break
else:
url = ''
else:
url = ''
extra_resources = {
'code' : [],
'gallery' : [],
'journal' : [],
'movim' : [],
'peertube' : [],
}
for res in extra_resources:
try:
count = len(xml_data.findall(namespace + 'group[@name="' + res + '"]/' + namespace + 'x-ablabel'))
except:
breakpoint()
for p in range(count):
position = str(p + 1)
print(res, position)
for i in xml_data.find(namespace + 'group[@name="' + res + '"]/' + namespace + 'x-ablabel[' + position + ']'):
txt = i.text
for i in xml_data.find(namespace + 'group[@name="' + res + '"]/' + namespace + 'url[' + position + ']'):
uri = i.text
extra_resources[res].append({'label' : txt, 'uri' : uri})
vcard[res] = extra_resources[res]
vcard['alias'] = alias
vcard['email'] = email
vcard['fn'] = title
vcard['note'] = note
vcard['org'] = org
vcard['impp'] = impp
vcard['url'] = url
return vcard
def extract_atom_items(xml_data, limit=False):
# NOTE
# `.//` was not needded when node item payload was passed directly.
# Now that item is saved as xml, it is required to use `.//`.
# Perhaps navigating a level down (i.e. to "child"), or removing the root from the file would solve this.
#namespace = './/{http://www.w3.org/2005/Atom}'
namespace = '{http://www.w3.org/2005/Atom}'
title = item_payload.find(namespace + 'title')
links = item_payload.find(namespace + 'link')
title = xml_data.find(namespace + 'title')
links = xml_data.find(namespace + 'link')
if (not isinstance(title, ET.Element) and
not isinstance(links, ET.Element)): return None
title_text = '' if title == None else title.text
link_href = ''
if isinstance(links, ET.Element):
for link in item_payload.findall(namespace + 'link'):
for link in xml_data.findall(namespace + 'link'):
link_href = link.attrib['href'] if 'href' in link.attrib else ''
if link_href: break
contents = item_payload.find(namespace + 'content')
contents = xml_data.find(namespace + 'content')
content_text = ''
if isinstance(contents, ET.Element):
for content in item_payload.findall(namespace + 'content'):
for content in xml_data.findall(namespace + 'content'):
content_text = content.text or ''
if content_text: break
summaries = item_payload.find(namespace + 'summary')
summaries = xml_data.find(namespace + 'summary')
summary_text = ''
if isinstance(summaries, ET.Element):
for summary in item_payload.findall(namespace + 'summary'):
for summary in xml_data.findall(namespace + 'summary'):
summary_text = summary.text or ''
if summary_text: break
published = item_payload.find(namespace + 'published')
published = xml_data.find(namespace + 'published')
published_text = '' if published == None else published.text
categories = item_payload.find(namespace + 'category')
categories = xml_data.find(namespace + 'category')
tags = []
if isinstance(categories, ET.Element):
for category in item_payload.findall(namespace + 'category'):
for category in xml_data.findall(namespace + 'category'):
if 'term' in category.attrib and category.attrib['term']:
category_term = category.attrib['term']
if len(category_term) < 20:
@ -1214,7 +1631,7 @@ class Syndication:
if limit and len(tags) > 4: break
identifier = item_payload.find(namespace + 'id')
identifier = xml_data.find(namespace + 'id')
if identifier and identifier.attrib: print(identifier.attrib)
identifier_text = '' if identifier == None else identifier.text
@ -1231,13 +1648,6 @@ class Syndication:
class XmppUtilities:
async def get_item_ids_of_node(jabber_id, password, jid_bare, node_name, nodes):
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
xmpp_instance.disconnect()
return node_item_ids
def set_action_instance_type(jid_kind, node_name=None):
if jid_kind in ('conference', 'server'):
action = 'Discover'
@ -1363,13 +1773,13 @@ class XmppXep0030:
async def get_jid_items(self, jid_bare):
try:
condition = text = None
condition = text = ''
error = False
iq = await self['xep_0030'].get_items(jid=jid_bare)
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = None
iq = ''
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
@ -1393,7 +1803,7 @@ class XmppXep0030:
jid_kind = None
try:
error = False
condition = text = None
condition = text = ''
iq = await self['xep_0030'].get_info(jid=jid_bare)
iq_disco_info = iq['disco_info']
if iq_disco_info:
@ -1410,17 +1820,8 @@ class XmppXep0030:
'muc_unmoderated' in features or
'muc_unsecured' in features):
jid_kind = 'muc'
else:
elif '@' in jid_bare:
for identity in iq_disco_info['identities']:
if identity[0] == 'pubsub' and identity[1] == 'service':
#if 'http://jabber.org/protocol/pubsub' in features:
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
#if 'http://jabber.org/protocol/rsm' in features:
jid_kind = 'pubsub'
break
if identity[0] == 'server' and identity[1] == 'im':
jid_kind = 'server'
break
#if identity[0] == 'pubsub' and identity[1] == 'pep':
if identity[0] == 'account':
#if 'urn:xmpp:bookmarks:1#compat-pep' in features:
@ -1436,14 +1837,25 @@ class XmppXep0030:
break
if identity[0] == 'client' and identity[1] == 'bot':
jid_kind = 'bot'
else:
for identity in iq_disco_info['identities']:
if identity[0] == 'pubsub' and identity[1] == 'service':
#if 'http://jabber.org/protocol/pubsub' in features:
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
#if 'http://jabber.org/protocol/rsm' in features:
jid_kind = 'pubsub'
break
if identity[0] == 'server' and identity[1] == 'im':
jid_kind = 'server'
break
#logger.info('Jabber ID: {}\n'
# 'Chat Type: {}'.format(jid_bare, result))
else:
iq = condition = text = None
iq = condition = text = ''
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = None
iq = ''
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
@ -1472,7 +1884,7 @@ class XmppXep0045:
if not seconds: seconds = 864000
try:
error = False
condition = text = None
condition = text = ''
#since = datetime.fromtimestamp(time.time()-seconds)
iq = await self['xep_0045'].join_muc_wait(
jid,
@ -1519,7 +1931,7 @@ class XmppXep0054:
async def get_vcard_data(self, jid_bare):
try:
error = False
condition = text = None
condition = text = ''
iq = await self['xep_0054'].get_vcard(jid_bare)
except (IqError, IqTimeout) as e:
error = True
@ -1530,7 +1942,7 @@ class XmppXep0054:
text = 'Could not retrieve vCard'
else:
text = 'Unknown Error'
iq = None
iq = ''
result = {
'error' : error,
'condition' : condition,
@ -1543,7 +1955,7 @@ class XmppXep0060:
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
try:
error = False
condition = text = None
condition = text = ''
if max_items:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5)
@ -1561,7 +1973,7 @@ class XmppXep0060:
result = iq
except (IqError, IqTimeout) as e:
error = True
iq = None
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
@ -1579,7 +1991,7 @@ class XmppXep0060:
async def get_node_item_ids(self, jid_bare, node_name):
try:
error = False
condition = text = None
condition = text = ''
iq = await self['xep_0030'].get_items(
jid_bare, node_name)
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
@ -1587,7 +1999,7 @@ class XmppXep0060:
# jid_bare, node_name, timeout=5)
except (IqError, IqTimeout) as e:
error = True
iq = None
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text: