通信人家园
标题:
Python建立socket测试
[查看完整版帖子]
[打印本页]
时间:
2013-7-14 15:02
作者:
sunneverrust
标题:
Python建立socket测试
嵌入式Web 服务器,需要压力测试,正好可以使用Python快速写一个小脚本,只花了半天时间,推荐给大家,继续完善,可以多线程发包,长连接或者短连接。
import socket #for sockets
import sys #for exit
import time
import thread,threading
remote_ip = '60.1.21.163'
port = 80
sockIndex = 1
def connToServer (x,times):
global sockIndex
Reconnect = 0
try:
#create an AF_INET, STREAM socket (TCP)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((remote_ip , port))
except socket.error:
print ('Failed to create socket. Error code: ')
sys.exit();
for i in range(1, times):
try:
#global Reconnect
if(1 == Reconnect):
print('Reconnect')
s.close
time.sleep(1)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((remote_ip , port))
print('Reconnect success')
Reconnect = 0
#s.sendall('POST /OMC_TOOL?action=UDT/ACS_OAM_CTRL HTTP/1.1\r\n')
#s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\nconnection: keep-alive\r\n\r\n')
s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /5OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /6OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /7OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /8OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /9OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
print ('socket s Senddata Successfully!')
time.sleep(1)
except socket.error,arg:
#global Reconnect
Reconnect =1
(errno,err_msg) = arg
print"fail:%s,errno=%d" %(err_msg,errno)
s.close
def test(): #Use thread.start_new_thread() to create 2 new threads
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
if __name__=='__main__':
test()
print ('Message test over')
时间:
2013-7-14 20:09
作者:
PDSCH
python很强大
通信人家园 (https://www.txrjy.com/)
Powered by C114