First version ok for both PY2 and PY3

pull/2118/head
nicolargo 2022-08-08 10:59:00 +02:00
parent 296c3fd2a0
commit 7c5bbe6c5a
4 changed files with 46 additions and 43 deletions

View File

@ -181,9 +181,9 @@ public_ip_disabled=False
# Need to create an aacount: https://censys.io/login
censys_url=https://search.censys.io/api
# Get your own credential here: https://search.censys.io/account/api
# Enter and uncomment the following lines to use your credentials
censys_username=13718bd6-b0c4-4c21-ae63-2d13da0b2dd8
censys_password=hnjdSOukHM84tgx1Oo9Zzv4Pmz2E5bXH
# Enter your creadential and uncomment the following lines
#censys_username=<censys_api_id>
#censys_password=<censys_secret>
# List of fields to be displayed in user interface (comma separated)
censys_fields=location:continent,location:country,autonomous_system:name

View File

@ -345,3 +345,8 @@ def pretty_date(time=False):
if day_diff < 365:
return str(day_diff // 30) + " months"
return str(day_diff // 365) + " years"
def urlopen_auth(url, username, password):
"""Open a url with basic auth"""
return urlopen(Request(url, headers={'Authorization': 'Basic ' + base64.b64encode(('%s:%s' % (username, password)).encode()).decode()}))

View File

@ -10,10 +10,9 @@
"""IP plugin."""
import threading
import urllib
from json import loads
from glances.compat import iterkeys, urlopen, queue
from glances.compat import iterkeys, urlopen, queue, urlopen_auth
from glances.logger import logger
from glances.timer import Timer, getTimeSinceLastUpdate
from glances.plugins.glances_plugin import GlancesPlugin
@ -72,6 +71,12 @@ class Plugin(GlancesPlugin):
self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
self.censys_fields = self.get_conf_value("censys_fields", default=[None])
self.public_info_disabled = (
self.censys_url is None
or self.censys_username is None
or self.censys_password is None
or self.censys_fields is None
)
@GlancesPlugin._check_decorator
@GlancesPlugin._log_result_decorator
@ -95,6 +100,7 @@ class Plugin(GlancesPlugin):
stats['gateway'] = default_gw[0]
# Then the private IP address
# If multiple IP addresses are available, only the one with the default gateway is returned
try:
address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
@ -106,30 +112,21 @@ class Plugin(GlancesPlugin):
stats['mask'] = mask
stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
# Continue with the public IP address
# Finally with the public IP address
time_since_update = getTimeSinceLastUpdate('public-ip')
try:
if not self.public_ip_disabled and (
self.stats.get('address') != address
or time_since_update > self.public_address_refresh_interval
self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
):
self.public_address = PublicIpAddress().get()
if not self.public_info_disabled:
self.public_info = PublicIpInfo(
self.public_address, self.censys_url, self.censys_username, self.censys_password
).get()
except (KeyError, AttributeError) as e:
logger.debug("Cannot grab public IP address ({})".format(e))
logger.debug("Cannot grab public IP information ({})".format(e))
else:
stats['public_address'] = self.public_address
# Finally the Censys information
if (
self.public_address
and not self.public_ip_disabled
and (self.stats.get('address') != address
or time_since_update > self.public_address_refresh_interval)
):
self.public_info = PublicIpInfo(self.public_address,
self.censys_url,
self.censys_username,
self.censys_password).get()
stats['public_info'] = self.public_info
elif self.input_method == 'snmp':
@ -163,6 +160,8 @@ class Plugin(GlancesPlugin):
# Build the string message
msg = ' - '
ret.append(self.curse_add_line(msg))
# Start with the private IP information
msg = 'IP '
ret.append(self.curse_add_line(msg, 'TITLE'))
if 'address' in self.stats:
@ -172,6 +171,8 @@ class Plugin(GlancesPlugin):
# VPN with no internet access (issue #842)
msg = '/{}'.format(self.stats['mask_cidr'])
ret.append(self.curse_add_line(msg))
# Then with the public IP information
try:
msg_pub = '{}'.format(self.stats['public_address'])
except (UnicodeEncodeError, KeyError):
@ -182,14 +183,21 @@ class Plugin(GlancesPlugin):
msg = ' Pub '
ret.append(self.curse_add_line(msg, 'TITLE'))
ret.append(self.curse_add_line(msg_pub))
if 'public_info' in self.stats:
for f in self.censys_fields:
field = f.split(':')
if len(field) == 1 and field[0] in self.stats['public_info']:
msg = '{}'.format(self.stats['public_info'][field[0]])
elif len(field) == 2 and field[0] in self.stats['public_info'] and field[1] in self.stats['public_info'][field[0]]:
msg = '{}'.format(self.stats['public_info'][field[0]][field[1]])
ret.append(self.curse_add_line(msg))
if 'public_info' in self.stats and self.stats['public_info']:
field_result = []
for f in self.censys_fields:
field = f.split(':')
if len(field) == 1 and field[0] in self.stats['public_info']:
field_result.append('{}'.format(self.stats['public_info'][field[0]]))
elif (
len(field) == 2
and field[0] in self.stats['public_info']
and field[1] in self.stats['public_info'][field[0]]
):
field_result.append('{}'.format(self.stats['public_info'][field[0]][field[1]]))
ret.append(self.curse_add_line(' '))
ret.append(self.curse_add_line('/'.join(field_result)))
return ret
@ -267,11 +275,7 @@ class PublicIpInfo(object):
"""Return the public IP information returned by one of the online service."""
q = queue.Queue()
t = threading.Thread(target=self._get_ip_public_info, args=(q,
self.ip,
self.url,
self.username,
self.password))
t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password))
t.daemon = True
t.start()
@ -290,13 +294,7 @@ class PublicIpInfo(object):
"""Request the url service and put the result in the queue_target."""
request_url = "{}/v2/hosts/{}".format(url, ip)
try:
# Python 3 code only
# https://stackoverflow.com/questions/24635064/how-to-use-urllib-with-username-password-authentication-in-python-3/24648149#24648149
request = urllib.request.Request(request_url)
base64string = urllib.request.base64.b64encode(bytes('%s:%s' % (username, password), 'ascii'))
request.add_header("Authorization", "Basic %s" % base64string.decode('utf-8'))
result = urllib.request.urlopen(request)
response = result.read()
response = urlopen_auth(request_url, username, password).read()
except Exception as e:
logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e))
queue_target.put(None)

View File

@ -18,7 +18,7 @@ import json
import copy
from operator import itemgetter
from glances.compat import iterkeys, itervalues, listkeys, map, mean, nativestr
from glances.compat import iterkeys, itervalues, listkeys, map, mean, nativestr, PY3
from glances.actions import GlancesActions
from glances.history import GlancesHistory
from glances.logger import logger
@ -1138,7 +1138,7 @@ class GlancesPlugin(object):
Do not take into account if trend < significant
"""
ret = '-'
if trend is None:
if trend is None or not PY3:
ret = ' '
elif trend > significant:
ret = unicode_message('ARROW_UP', self.args)