new test for mp

This commit is contained in:
2018-05-17 21:44:45 +02:00
parent 32828d3a65
commit f78ddbc005
8 changed files with 324 additions and 25 deletions

View File

@@ -14,7 +14,7 @@ import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gio, GObject as gobject
import time
from multiprocessing import Process, Value, Lock, Pool, Queue
from multiprocessing import Process, Value, Lock, Pool, Queue, cpu_count
import threading
gobject.threads_init()
@@ -38,6 +38,7 @@ class Listener(gobject.GObject):
while True:
# Listen for results on the queue and process them accordingly
data = self.queue.get()
print(data)
# Check if finished
if data[1]=="finished":
print("Listener is finishing.")
@@ -58,7 +59,7 @@ class Worker():
for i in range(self.filecnt):
proportion = (float(i+1))/self.filecnt
self.queue.put((proportion, "working...", i))
time.sleep(0.01)
#time.sleep(0.01)
process_file(filelist[i])
self.queue.put((1.0, "finished"))
print("The worker has finished.")
@@ -70,7 +71,7 @@ folderlist = []
cnt = 0
srcdir = ''
destdir = ''
#num_cores = multiprocessing.cpu_count()
num_cores = cpu_count()
fileindex = 0
filecnt = 0
incinc = ''
@@ -334,9 +335,10 @@ class ExampleApp:
return
print("Creating shared Queue")
queue = Queue()
queue = Queue(num_cores)
print("Creating Worker")
for c in range(num_cores):
print("Creating Worker: ", c)
worker = Worker(queue, self.filecnt, filelist)
print("Creating Listener")

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3.5
import multiprocessing
class Listener(multiprocessing.Threading):
def __init__(self.response):
multiprocessing.Threading.__init__(self)
self.response = response
def run(self):
while True:
r = self.responce.get()
if r is None:
break
print(r)
self.response.task_done()
self.response.task_done()
return
class Worker(multiprocessing.Process):
def __init__(self.workload):
mulitiprocessing.Process.__init__(self)
self.workload = workload
def run(self):
while True:
n = self.workload.get()
print(n*n)
self.workload.task_done()
work = [range(10)]
print(work)

View File

@@ -0,0 +1,124 @@
#import gobject
#import pygtk
#pyGtk.require('2.0')
#import gtk
#import multiprocessing
#import threading
#import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gio, GObject as gobject
import time
from multiprocessing import Process, Value, Lock, Pool, Queue, cpu_count
from threading import Thread
gobject.threads_init()
class Listener(gobject.GObject):
__gsignals__ = {
'updated' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT, gobject.TYPE_STRING)),
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, queue):
gobject.GObject.__init__(self)
self.queue = queue
def go(self):
print("Listener has started")
while True:
# Listen for results on the queue and process them accordingly
data = self.queue.get()
# Check if finished
if data[1]=="finished":
print("Listener is finishing.")
self.emit("finished")
return
else:
self.emit('updated', data[0], data[1])
gobject.type_register(Listener)
class Worker():
def __init__(self, queue):
self.queue = queue
def go(self):
print("The worker has started doing some work (counting from 0 to 9)")
for i in range(10):
proportion = (float(i+1))/10
self.queue.put((proportion, "working..."))
time.sleep(0.5)
self.queue.put((1.0, "finished"))
print("The worker has finished.")
class Interface:
def __init__(self):
self.process = None
self.progress = Gtk.ProgressBar()
button = Gtk.Button("Go!")
button.connect("clicked", self.go)
vbox = Gtk.VBox(spacing=5)
vbox.pack_start(self.progress)
vbox.pack_start(button)
vbox.show_all()
self.frame = vbox
def main(self):
window = Gtk.Window(Gtk.WINDOW_TOPLEVEL)
window.set_border_width(10)
window.add(self.frame)
window.show()
window.connect("destroy", self.destroy)
Gtk.main()
def destroy(self, widget, data=None):
Gtk.main_quit()
def callbackDisplay(self, obj, fraction, text, data=None):
self.progress.set_fraction(fraction)
self.progress.set_text(text)
def callbackFinished(self, obj, data=None):
if self.process==None:
raise RuntimeError("No worker process started")
print("all done; joining worker process")
self.process.join()
self.process = None
self.progress.set_fraction(1.0)
self.progress.set_text("done")
def go(self, widget, data=None):
if self.process!=None:
return
print("Creating shared Queue")
queue = Queue()
print("Creating Worker")
worker = Worker(queue)
print("Creating Listener")
listener = Listener(queue)
listener.connect("updated",self.callbackDisplay)
listener.connect("finished",self.callbackFinished)
print("Starting Worker")
self.process = Process(target=worker.go, args=())
self.process.start()
print("Starting Listener")
thread = Thread(target=listener.go, args=())
thread.start()
if __name__ == '__main__':
gui = Interface()
gui.main()

View File

@@ -0,0 +1,17 @@
#import threading
import multiprocessing
import time
class MyThread(threading.Thread):
def run(self):
time.sleep(5)
return
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
print(t.name, 'is alive -> ',t.is_alive())
t.join()
print(t.name, 'is alive -> ',t.is_alive())

View File

@@ -0,0 +1,45 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
import queue
running = multiprocessing.Value('i', 1)
request = multiprocessing.JoinableQueue()
response = multiprocessing.Queue(1)
work=[]
def worker(request, response):
#try:
param = request.get(timeout=0.1)
if param.empty()
# Imagine heavy computation here.
#except queue.Empty:
# To check running flag.
response.put("work_done")
else:
result = param ** 2
response.put(result)
def main():
for n in range(10):
work.append(n)
process = multiprocessing.Process(target=worker, args=(request, response))
process.start()
for i in work:
print("Queueing {}".format(i))
request.put(i)
result = response.get()
#if result != "work_done":
while result != "work_done":
print('Result', result)
process.join()
if __name__ == '__main__':
main()

View File

@@ -3,29 +3,38 @@ import multiprocessing
import time
import sys
def network_worker(tq, wq):
def network_worker(tq, wq, eq):
ticker_queue = tq
work_queue = wq
while ticker_queue.empty() is not True:
ticker = tq.get()
print(threading.currentThread().getName(),"recieved ",ticker)
ticker = tq.get(0.1)
print(threading.currentThread().getName(),"received ",ticker)
work_queue.put(ticker-1000)
time.sleep(0.02)
#ticker_queue.task_done()
work_queue.put(None)
time.sleep(0.01)
ticker_queue.task_done()
exit_queue.put(1)
#work_queue.close()
return
def worker(wq):
def worker(wq, i):
work_queue = wq
while work_queue.get() is not None:
task = work_queue.get()
print(multiprocessing.current_process(),"recieved",task)
#work_queue.task_done()
#while work_queue.get(0.1) is not None:
while work_queue.qsize():
print(work_queue.qsize())
#if work_queue.empty() == False:
with i.get_lock():
i.value += 1
task = work_queue.get(10)
print(multiprocessing.current_process(),"received",task)
#simulate work
time.sleep(0.001)
work_queue.task_done()
return
ticker_queue = multiprocessing.JoinableQueue()
work_queue = multiprocessing.JoinableQueue()
exit_queue = multiprocessing.JoinableQueue()
iterations = multiprocessing.Value('i', 0)
tickers = range(1000)
processes = []
@@ -33,24 +42,26 @@ for i in tickers:
ticker_queue.put(i)
for i in range(20):
t = threading.Thread(target=network_worker, args = (ticker_queue, work_queue))
t = threading.Thread(target=network_worker, args = (ticker_queue, work_queue, exit_queue))
t.deamon = True
print("Starting: ",t.name)
t.start()
for i in range(4):
p = multiprocessing.Process(target = worker, args = (work_queue, ))
#p.deamon = True
for i in range(8):
p = multiprocessing.Process(target = worker, args = (work_queue, iterations, ))
p.deamon = True
print("Starting: ",p.name)
p.start()
processes.append(p)
if work_queue.empty() == True:
ticker_queue.join()
print(exit_queue,empty())
if exit_queue.empty() == False:
work_queue.join()
for p in processes:
p.terminate()
p.join()
print("Resulting work:", work_queue)
print("Total number of iterations:", iterations.value)

View File

@@ -0,0 +1,70 @@
import threading
import multiprocessing
import time
import sys
def network_worker(tq, wq):
while tq.empty() is not True:
ticker = tq.get(0.1)
print(threading.currentThread().getName(),"received ",ticker)
wq.put(ticker-1000)
#simulate work
time.sleep(0.01)
tq.task_done()
for i in range(8):
wq.put(None)
def worker(wq, i):
while True:
task = wq.get(2)
with i.get_lock():
if task is None:
break
i.value += 1
print(multiprocessing.current_process().name, task, wq.qsize(), i.value)
#simulate work
time.sleep(0.001)
wq.task_done()
print(task)
wq.task_done()
print("Closing: ", multiprocessing.current_process().name)
ticker_queue = multiprocessing.JoinableQueue()
work_queue = multiprocessing.JoinableQueue()
iterations = multiprocessing.Value('i', 0)
tickers = range(1000)
processes = []
threads = []
for i in tickers:
ticker_queue.put(i)
for i in range(20):
t = threading.Thread(target=network_worker, args = (ticker_queue, work_queue))
t.deamon = True
print("Starting: ",t.name)
t.start()
threads.append(t)
for i in range(8):
p = multiprocessing.Process(target = worker, args = (work_queue, iterations, ))
p.deamon = True
print("Starting: ",p.name)
p.start()
processes.append(p)
print("Closing down threads")
for t in threads:
print("Closing: ",t.name)
t.join()
print("Closing down workers")
for p in processes:
p.join()
print("Total number of iterations:", iterations.value)