Files
h2inc-old/Multiprocessing test/ticker_gtk.py
2018-07-29 14:14:04 +02:00

340 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3.5
# You are free to use and/or change this code for
# your own needs.
# Original code (c)2018 Jan Lerking
# Program to convert C-header (*.h) files to nasm include files (*.inc),
# for direct usage in assembly programming using nasm/yasm.
import sys
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gio, GObject as gobject
import multiprocessing
import threading
import globvar
import time
#from h2inc_mp import start_workers
#from h2inc_fp import sourcedir_filecnt, sourcedir_foldercnt
globvar.init()
gobject.threads_init()
class Listener(gobject.GObject):
__gsignals__ = {
'ticker_update' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_INT,)),
'handlers_update' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT,
gobject.TYPE_FLOAT,
gobject.TYPE_FLOAT,)),
'handlers_finished' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
()),
'workers_update' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_PYOBJECT,)),
'workers_finished' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, queue):
gobject.GObject.__init__(self)
self.signal_queue = queue
def go(self):
print("Listener has started")
while True:
# Listen for results on the queue and process them accordingly
data = self.signal_queue.get()
print(data)
# Check if finished
if data[0]=="t_finished":
print("Ticker is finishing.")
self.emit("ticker_finished", data[0])
return
elif data[0]=="ticker_update":
print("Ticker is running!")
self.emit("ticker_update")
return
elif data[0]=="handler_update":
print("Handlers are working.")
self.emit("handlers_update")
return
elif data[0]=="work_update":
print("Workers are working.")
self.emit("workers_update", data[1])
print("Signal: workers_update, ",data[1])
return
else:
return
gobject.type_register(Listener)
class Worker():
def __init__(self, l, sq, wq, i):
self.lock = l
self.signal_queue = sq
self.work_queue = wq
self.iterations = i
self.fractions = []
def go(self):
while True:
print("Working")
task = self.work_queue.get(2)
self.fractions.append(task)
print(task)
self.lock.acquire()
print("Lock acquired!")
try:
print("Getting qsize")
tasks = self.work_queue.qsize()
self.fractions.append(tasks)
print(tasks)
finally:
self.lock.release()
with self.iterations.get_lock():
if task is None:
break
self.iterations.value += 1
self.fractions.append(self.iterations.value)
print(self.iterations.value)
#tasks = tasks/1000
time.sleep(0.001)
self.work_queue.task_done()
self.lock.acquire()
self.signal_queue.put("work_update", self.fractions)
print(self.fractions)
self.lock.release()
self.work_queue.task_done()
self.signal_queue.put("work_finished")
class Handler():
def __init__(self, sq, tq, wq, i):
self.signal_queue = sq
self.ticker_queue = tq
self.work_queue = wq
self.iterations = i
def go(self):
while True:
if self.ticker_queue.empty() is not True:
ticker = self.ticker_queue.get(0.1)
self.work_queue.put(ticker-1000)
time.sleep(0.01)
self.ticker_queue.task_done()
self.ticker_queue.task_done()
self.signal_queue.put("")
class Ticker():
def __init__(self, sq, tq, i):
self.signal_queue = sq
self.ticker_queue = tq
self.tickers = i
self.iterations = 1
def go(self):
while self.iterations <= self.tickers:
self.ticker_queue.put(self.iterations)
time.sleep(0.01)
self.signal_queue.put("ticker_update", self.iterations)
self.iterations += 1
self.signal_queue.put("ticker_finished")
#def looprun(n):
#for i in tickers:
#ticker_queue.put(i)
#for i in range(20):
#t = threading.Thread(target = handler, args = (ticker_queue, work_queue))
#t.deamon = True
#t.start()
#threads.append(t)
#for i in range(8):
#p = multiprocessing.Process(target = worker, args = (work_queue, iterations, ))
#p.deamon = True
#p.start()
#processes.append(p)
#ticker_queue.join()
#for i in range(8):
#work_queue.put(None)
#work_queue.join()
#print("Closing down workers")
#for p in processes:
#p.join()
#print("Run:",n)
#print("Total number of iterations:", iterations.value)
#return
#for n in range(100):
#looprun(n+1)
class t_gtk:
global app
def __init__(self):
self.ticker_put_count = 0
self.ticker_current_count = 0
self.ticker_processed_count = 0
self.work_put_count = 0
self.work_current_count = 0
self.work_processed_count = 0
self.ticker_num = globvar.ticker_num
self.fraction = 1/1000
self.process = None
self.signal_queue = multiprocessing.JoinableQueue()
self.ticker_queue = multiprocessing.JoinableQueue()
self.work_queue = multiprocessing.JoinableQueue()
self.iterations = multiprocessing.Value('i', 0)
self.tickers = globvar.ticker_num
self.work_num = 0
self.processes = []
self.threads = []
self.lock = multiprocessing.Lock()
self.app = Gtk.Application.new("org.ticker", Gio.ApplicationFlags(0))
self.app.connect("activate", self.on_app_activate)
self.app.connect("shutdown", self.on_app_shutdown)
app = self.app
def on_app_activate(self, app):
builder = Gtk.Builder()
builder.add_from_file("ticker.glade")
builder.connect_signals(self)
self.obj = builder.get_object
self.obj("window").set_application(app)
self.obj("window").set_wmclass("ticker","ticker")
self.obj("window").set_title("ticker - v.0.2.15")
self.obj("window").show_all()
self.obj("ticker_put_count_label").set_text(str(self.ticker_put_count))
self.obj("ticker_items_count_label").set_text(str(self.ticker_current_count))
self.obj("ticker_processed_count_label").set_text(str(self.ticker_processed_count))
self.obj("work_put_count_label").set_text(str(self.work_put_count))
self.obj("work_items_count_label").set_text(str(self.work_current_count))
self.obj("work_processed_count_label").set_text(str(self.work_processed_count))
def on_app_shutdown(self, app):
self.app.quit()
def run(self, argv):
self.app.run(argv)
def update_ticker_put(self, index):
self.val = index*self.fraction
self.obj("ticker_put_count_label").set_text(str(index))
self.obj("ticker_put_progress").set_fraction(self.val)
def update_ticker(self, obj, index1, index2, index3, data=None):
self.val_1 = index1*self.fraction
self.val_2 = index2*self.fraction
self.val_3 = index3*self.fraction
self.obj("ticker_put_count_label").set_text(index1)
self.obj("ticker_put_progress").set_fraction(self.val_1)
self.obj("ticker_items_count_label").set_text(index2)
self.obj("ticker_items_progress").set_fraction(self.val_2)
self.obj("ticker_processed_count_label").set_text(index3)
self.obj("ticker_processed_progress").set_fraction(self.val_3)
def update_work(self, obj, index, data=None):
print(index)
self.val_1 = index[0]*self.fraction
self.val_2 = index[1]*self.fraction
self.val_3 = index[2]*self.fraction
self.obj("work_put_count_label").set_text(index[0])
self.obj("work_put_progress").set_fraction(self.val_1)
self.obj("work_items_count_label").set_text(index[1])
self.obj("work_items_progress").set_fraction(self.val_2)
self.obj("work_processed_count_label").set_text(index[2])
self.obj("work_processed_progress").set_fraction(self.val_3)
def ticker_Finished(self, obj, data=None):
for t in range(20):
t = threading.Thread(target = Handler.go, args = (self.ticker_queue, self.work_queue))
t.deamon = True
t.start()
self.threads.append(t)
self.obj("ticker_put_count_label").set_text(self.ticker_num)
self.obj("ticker_put_progress").set_fraction(1.0)
self.obj("ticker_items_count_label").set_text("0")
self.obj("ticker_items_progress").set_fraction(0.0)
self.obj("ticker_processed_count_label").set_text(self.ticker_num)
self.obj("ticker_processed_progress").set_fraction(1.0)
def workers_Finished(self, obj, data=None):
if self.process==None:
raise RuntimeError("No worker process started")
print("all done; joining worker process")
self.process.join()
self.process = None
self.obj("work_put_count_label").set_text(self.work_num)
self.obj("work_put_progress").set_fraction(1.0)
self.obj("work_items_count_label").set_text("0")
self.obj("work_items_progress").set_fraction(0.0)
self.obj("work_processed_count_label").set_text(self.work_num)
self.obj("work_processed_progress").set_fraction(1.0)
def on_window_destroy(self,window):
window.close()
def on_execute_clicked(self,widget):
print("Creating Listener")
listener = Listener(self.signal_queue)
listener.connect("ticker_update", self.update_ticker_put)
listener.connect("handlers_update",self.update_ticker)
listener.connect("handlers_finished",self.ticker_Finished)
listener.connect("workers_update",self.update_work)
listener.connect("workers_finished",self.workers_Finished)
print("Starting Listener")
thread = threading.Thread(target=listener.go, args=())
thread.start()
print("Creating ticker queue")
t = Ticker(self.signal_queue, self.ticker_queue, self.tickers)
th = threading.Thread(target=t.go, args = ())
th.deamon = True
th.start()
#th.join()
print(self.ticker_queue.qsize())
for i in range(20):
h = Handler(self.signal_queue, self.ticker_queue, self.work_queue, self.iterations)
t = threading.Thread(target = h.go, args = ())
t.deamon = True
t.start()
self.threads.append(t)
for i in range(8):
w = Worker(self.lock, self.signal_queue, self.work_queue, self.iterations)
p = multiprocessing.Process(target = w.go, args=())
p.deamon = True
p.start()
self.processes.append(p)
self.ticker_queue.join()
for i in range(8):
self.work_queue.put(None)
app = t_gtk()
app.run(sys.argv)