通信人家园

标题: Python天下,一键  [查看完整版帖子] [打印本页]

时间:  2024-10-11 07:25
作者: yanpeil     标题: Python天下,一键

import telnetlib
import time
import re
import pandas as pd
import os
import uuid
import schedule
import glob
import matplotlib.pyplot as plt

def do_telnet():
    host_ip = r'129.60.161.169'
    username = 'zte'
    password = 'zte'
    path = r'E:\JS\实战\omscript-master\\'
    #登录网元

    tn = telnetlib.Telnet()
    tn.open(host_ip, port=23, timeout=5)
    tn.read_until(b'Username:', timeout=5)
    tn.write(username.encode('ascii') + b'\n')
    tn.read_until(b'Password:', timeout=5)
    tn.write(password.encode('ascii') + b'\n')
    time.sleep(1)
    command_result = tn.read_until(b'#', timeout=5)
    if b'#' not in command_result:
        print('%s登录失败' % host_ip)
    else:
        print('%s登录成功' % host_ip)

    #查看端口状态

    command = "show interface brief"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)
    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')

    print(result_str)

    #列出端口为down端口

    pattern = re.compile(r'(\S+)\s+\S+\s+\S+\s+\S+\s+(down|up)\s+\S+\s+\S+')
    down_interfaces = []
    for match in pattern.finditer(command_result):
        interface, admin_state = match.groups()
        if admin_state == 'down':
            down_interfaces.append(interface)
    print(down_interfaces)

    #开启down端口 以xgei-0/2/0/1为例

    command = "conf t"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    run_time = tn.read_until(b'#')
    command = "interface xgei-0/2/0/1"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    run_time = tn.read_until(b'#')
    command = "no shutdown"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    run_time = tn.read_until(b'#')
    command = "end"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    run_time = tn.read_until(b'#')
    command = "exit"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    # print(down_interfaces)

    #查看光功率的配置


    command = "show opticalinfo brief"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)
    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')

    print(result_str)

    #OSPF故障处理

    command = "show clock"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\r\n')
    run_time = tn.read_until(b'#')
    run_time = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2024", run_time.decode('GB18030'))[0]

    command = "show ip ospf neighbor detail"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)

    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue

    result_str = "\n".join(result_list)

    dict_ouput = {}

    dict_ouput["host_ip"] = host_ip
    dict_ouput["time"] = run_time

    startpattern = re.compile(r'OSPF Router with ID (.+)')
    strtext = re.search(startpattern, str(result_str)).group(1)
    dict_ouput["OSPF Router with ID"] = strtext

    startpattern = re.compile(r'Neighbor\s+(\d+.\d+.\d+.\d+)')
    strtext = re.search(startpattern, str(result_str)).group(1)
    dict_ouput["Neighbor"] = strtext

    startpattern = re.compile(r'In the area\s+(.+)')
    strtext = re.search(startpattern, str(result_str)).group(1)
    dict_ouput["area"] = strtext

    startpattern = re.compile(r'State\s+(\w+), ')
    strtext = re.search(startpattern, str(result_str)).group(1)
    dict_ouput["State"] = strtext

    pd_output = pd.DataFrame.from_dict([dict_ouput])

    pd_output['time'] = pd.to_datetime(pd_output['time'])
    pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))

    pd_output.to_csv(path+os.sep+r'ospf'+'-'+str(host_ip)+'.csv', index=None, encoding='gb18030')

    tn.close()

    #导出up状态的端口流量
    pd_result_2 = pd.DataFrame()
    list_temperature_vec = []
    for j in list_str:
        regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
        if len(re.findall(r"Interface", j)) > 0:
            new_columns = list_find_str = re.split(r'\s+', j)
            new_columns = new_columns[0:8]

        if len(regex.findall(j)) > 0:
            list_find_str = regex.findall(j)[0]
            list_find_str = re.split(r'\s+', list_find_str)
            list_temperature_vec.append(list_find_str)
    pd_result_2 = pd.DataFrame(list_temperature_vec)
    pd_result_2.columns = new_columns
    pd_result_2 = pd_result_2[pd_result_2['Phy'] == 'up']
    # pd_output = pd.DataFrame.pd_result_2
    # print(pd_output)
    # print(pd_result_2)
    # 端口流量及状态检查

    pd_output = pd.DataFrame()
    for check_port in pd_result_2['Interface']:
        pd_each_port = pd.DataFrame()
        dict_ouput = {}

        dict_ouput["port_name"] = check_port
        dict_ouput["time"] = run_time

        command = " show interface " + check_port
        command = bytes(command, encoding='utf-8')
        tn.write(command + b'\n')
        time.sleep(1)

        result_list = []
        while (True):
            command_result = tn.read_very_eager().decode('ascii')
            # print(command_result)
            result_list.append(command_result)
            if re.findall(r"--More--", command_result.strip()):
                tn.write(b" ")

            elif re.findall(r"#", command_result.strip()):
                break
            else:
                time.sleep(0.05)
                continue
        result_str = "\n".join(result_list)

        startpattern = re.compile(r'In_Bytes\s+(\d+)\s+')
        strtext = re.search(startpattern, str(result_str)).group(1)
        strtext = int(strtext)
        dict_ouput["rx_bytes"] = strtext

        startpattern = re.compile(r'E_Bytes\s+(\d+)\s+')
        strtext = re.search(startpattern, str(result_str)).group(1)
        strtext = int(strtext)
        dict_ouput["tx_bytes"] = strtext
        pd_output = pd.concat([pd_output, pd.DataFrame.from_dict([dict_ouput])], axis=0)

    pd_output['time'] = pd.to_datetime(pd_output['time'])
    pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
    pd_output["id"] = pd_output["port_name"] + '_' + pd_output["time"]
    random_suffix = uuid.uuid4().hex[:8]
    # 构造文件名,并保存DataFrame到CSV文件
    filename = f"端口流量-{host_ip}-{random_suffix}.csv"
    filepath = os.path.join(path, filename)

    # 保存DataFrame到CSV文件
    pd_output.to_csv(filepath, index=False, encoding='gb18030')


#绘制折线图

# 指定CSV文件路径模式,例如所有在当前目录下以.csv结尾的文件
file_pattern = r"E:\竞赛\电信大数据实战\omscript-master\CSV数据\*.csv"

# 使用glob模块找到符合模式的所有文件
csv_files = glob.glob(file_pattern)

# 创建一个空列表用于存储每个文件的数据
data_frames = []

# 循环遍历所有文件
for file in csv_files:
    # 读取CSV文件到DataFrame
    df = pd.read_csv(file,encoding='gb18030')

    # 将读取的数据添加到列表
    data_frames.append(df)

# 使用pd.concat将所有的DataFrame合并成一个
combined_df = pd.concat(data_frames, ignore_index=True)

# 假设时间列是第一列,索引为0
time_column_index = 1  # 这里假设时间列是第一列,根据实际情况调整
time_column_name = combined_df.columns[time_column_index]

# 转换时间列为datetime类型
combined_df[time_column_name] = pd.to_datetime(combined_df[time_column_name])

# 按照时间列进行排序
combined_df = combined_df.sort_values(by=time_column_name)
# 现在combined_df包含了所有CSV文件中的数据
# print(combined_df)

# 设置字体以支持中文
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 假设第二列是X轴数据,第三列是Y轴数据
x = combined_df.iloc[:, 1]  # 获取第二列作为X轴数据
y1 = combined_df.iloc[:, 2]  # 获取第三列作为Y轴数据
y2 = combined_df.iloc[:, 3] # 获取第四列作为Y轴数据

# 计算差值
y1_diff= y1.diff()
y2_diff= y2.diff()
# 删除nan值
y1_diff= y1_diff.dropna()
y2_diff= y2_diff.dropna()
# 更新X轴数据以匹配差值后的数据
x_diff=x[y1_diff.index]


# 绘制折线图
plt.figure()  # 创建一个新的图表
plt.plot(x_diff, y1_diff, marker='o', linestyle='-', color='blue',label=combined_df.columns[2])
plt.plot(x_diff, y2_diff, marker='s', linestyle='--', color='red',label=combined_df.columns[3])# 绘制折线图,并用圆圈标记每个数据点
plt.title("xgei-0/2/0/2出入口流量")  # 图表标题
plt.xlabel(combined_df.columns[1])  # X轴标签,使用CSV的第二列标题
plt.ylabel("流量")  # Y轴标签,使用CSV的第三列标题
plt.legend() # 显示图例
plt.grid(True)  # 显示网格

# 设置x轴刻度标签的旋转角度
plt.xticks(rotation=90)

plt.tight_layout()

plt.show()

#定时循环执行操作

# 定义一个包装函数来调用实际的 do_telnet 函数
def do_telnet_job():
    do_telnet()

if __name__ == '__main__':

# 设置每隔5秒执行一次 do_telnet_job 函数
    schedule.every(5).seconds.do(do_telnet_job)

# 主循环,持续运行以检查并执行计划任务
    while True:
        schedule.run_pending()
        time.sleep(1)






通信人家园 (https://test.txrjy.com/) Powered by C114