通信人家园

标题: Python建立socket测试  [查看完整版帖子] [打印本页]

时间:  2013-7-14 15:02
作者: sunneverrust     标题: Python建立socket测试

嵌入式Web 服务器,需要压力测试,正好可以使用Python快速写一个小脚本,只花了半天时间,推荐给大家,继续完善,可以多线程发包,长连接或者短连接。
import socket #for sockets
import sys #for exit
import time
import thread,threading  
remote_ip = '60.1.21.163'
port = 80
sockIndex = 1
def connToServer (x,times):
    global sockIndex   
    Reconnect = 0
    try:
        #create an AF_INET, STREAM socket (TCP)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((remote_ip , port))
    except socket.error:
        print ('Failed to create socket. Error code: ')
        sys.exit();

    for i in range(1, times):
        try:
            #global Reconnect
            if(1 == Reconnect):
                print('Reconnect')
                s.close
                time.sleep(1)
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect((remote_ip , port))
                print('Reconnect success')
                Reconnect = 0
            #s.sendall('POST /OMC_TOOL?action=UDT/ACS_OAM_CTRL HTTP/1.1\r\n')
            #s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\nconnection:  keep-alive\r\n\r\n')
            s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /5OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /6OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /7OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /8OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            #s.sendall('GET /9OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
            
            print ('socket s Senddata Successfully!')
            time.sleep(1)
        except socket.error,arg:
            #global Reconnect
            Reconnect =1   
            (errno,err_msg) = arg
            print"fail:%s,errno=%d" %(err_msg,errno)
            s.close
def test(): #Use thread.start_new_thread() to create 2 new threads  
    thread.start_new_thread(connToServer,(1,10000))  
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))
    thread.start_new_thread(connToServer,(1,10000))      
   
if __name__=='__main__':  
    test()  
print ('Message test over')



时间:  2013-7-14 20:09
作者: PDSCH

python很强大




通信人家园 (https://www.txrjy.com/) Powered by C114