源码包下载(包含前端页面):http://itdaye.com/wp-content/uploads/2021/08/BMS.zip
1.搭建简单web服务器web.py
import socket import multiprocessing import sys import framework # 定义web服务类 class HttpWebServer(): def __init__(self,argv_port): # 创建套接字对象 HttpWebServer_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 设置端口号复用, 程序退出端口立即释放 HttpWebServer_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) # 绑定端口号和IP地址 HttpWebServer_socket.bind(('',argv_port)) # 设置监听 HttpWebServer_socket.listen(126) self.HttpWebSever_socket = HttpWebServer_socket # 处理客户端请求 def client_request(self,client_link_object): # 接受客户端数据 二进制数据需要进行解码 client_data = client_link_object.recv(4096) # 处理客户端接受到的数据 分离出访问的路径信息 mulu_msg = str(client_data.decode('utf-8')).split(' ', 2) request_path =mulu_msg[1] # 判断客户端请求是否是根目录 if request_path == '/': request_path = '/index.html' if request_path.endswith('.html'): """判断客户的请求是否是动态资源请求,动态资源请求交给web框架处理""" # 把接收到的客户数据存储到字典中 env ={ "request_path":request_path } status,headers,response_body = framework.handle_request(env) # 使用框架处理的数据拼接响应报文 # 响应行 response_line = "HTTP/1.1 %s\r\n" % status # 响应头 response_header ="" for header in headers: # 拼接多个响应头 response_header += "%s: %s\r\n" % header response_data = (response_line + response_header + "\r\n"+ response_body).encode('utf-8') #发送数据 client_link_object.send(response_data) #关闭socket client_link_object.close() else: """静态资源请求处理""" # 对客户端的提供的路径进行判断 try: with open('static' + request_path, 'rb') as file: data1 = file.read() except Exception as e: data1 = open('static/error.html', 'rb').read() # 响应行 response_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_head = 'Server: itdaye.com\r\n' # 响应体 data = (response_line + response_head + '\r\n').encode('utf-8') + data1 # 响应客户端数据 这个数据需要根据http响应报文格式拼接组成 client_link_object.send(data) # 关闭套接字 client_link_object.close() print('访问路径不存在') else: # 响应行 response_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_head = 'Server: itdaye.com\r\n' # 响应体 data = (response_line + response_head + '\r\n').encode('utf-8') + data1 # 响应客户端数据 这个数据需要根据http响应报文格式拼接组成 client_link_object.send(data) # 关闭套接字 client_link_object.close() # 定义一个启动函数,用来创建子进程 def start(self): while True: # 等待客户端连接请求 连接建立成功会返回两个数据,一个对象,和一个元祖数据(客户端ip和端口) client_link_object, ipandport = self.HttpWebSever_socket.accept() # 创建子进程 client_process = multiprocessing.Process(target=self.client_request, args=(client_link_object,), daemon=True) # 启动进程 client_process.start() # 定义一个主函数,用作程序入口 def main(): # argv_order = sys.argv # # if len(argv_order) != 2: # print('您输入的命令格式错误!!!') # return # if not argv_order[1].isdigit(): # print('端口号请输入数字') # return # argv_port = int(argv_order[1]) httpws = HttpWebServer(8000) httpws.start() if __name__ == '__main__': main()
2.搭建web框架framework.py
import time import pymysql import json route_list=[] #定义一个装饰器 def route(patch): def decorator(func): route_list.append((patch,func)) def inner(): func() return inner return decorator # 定义一个英雄对应图书的数据接口 @route("/herobooks_data.html") def herobooks_data(): # 响应状态 status = "200 OK" # 响应头 response_header = [("Server", "PWS2.0"),("Content-Type","charset=utf-8")] conn = pymysql.connect(host="192.168.1.128", port=3306, user='root', password='mysql', database='miniweb', charset='utf8') # 创建游标 cursor = conn.cursor() # 设置sql语句 sql = "select h.hname,b.btitle from tb_heros h left join tb_books b on h.hbook_id=b.id" cursor.execute(sql) result_sql = cursor.fetchall() print(type(result_sql)) cursor.close() conn.close() # 定义一个英雄对应图书数据列表 herotobook_list = list() # 遍历查询到的每一行数据并且转换成字典 for row in result_sql: # 创建空的字典 herobook_dict = dict() herobook_dict["hname"] = row[0] herobook_dict["btitle"] = row[1] herotobook_list.append(herobook_dict) # 把列表字典转换成json字符串 json_str = json.dumps(herotobook_list, ensure_ascii=False) print(json_str) return status, response_header, json_str @route("/index.html") def index(): # 响应状态 status = "200 OK" # 响应头 response_header = [("Server","PWS2.0")] with open("template/index.html", "r",encoding="utf-8") as file: file_data = file.read() result = file_data.replace("{%content%}", "") return status,response_header,result # 处理未找到的资源请求 def not_found(): # 响应状态 status = "404 Not Found" # 响应头 response_header = [("Server","PWS2.0")] # 处理后的数据 data = "Not Found" return status, response_header, data # route_list = [ # ('/index.html', index), # ('/center.html', center) # ] # 处理用户的动态资源请求 def handle_request(env): request_path =env['request_path'] print(request_path) for path,func in route_list: if request_path == path: #获取首页信息 result = func() return result else: #没有找的资源 result = not_found() return result if __name__ == '__main__': index()