Discussion:
Stop thread-ing when condition is met
Panagiotis Atmatzidis
2016-03-29 11:15:38 UTC
Permalink
Hello,

I have a ruby script which uses a thread pool to fetch JSON objects via http. Now, I’m using a static number of pages (see pseudo-code below) but I need to introduce a condition and stop the process when that condition is met (e.g. request returns a 404).

The structure now looks like this:

1.upto(100) do |n|
threads << Thread.new { ... }
end
threads.join()

What I don’t understand is how to apply a sort of break when a condition is met. I’m thinking something along the lines:

x = true
while x
threads << Thread.new {


break unless x
}
end
threads.join()

Will the above approach work? Is there a better one? How do people deal in these situations with threads?

thanks!


Panagiotis (atmosx) Atmatzidis

email: ***@convalesco.org
URL: http://www.convalesco.org
GnuPG ID: 0x1A7BFEC5
gpg --keyserver pgp.mit.edu --recv-keys 1A7BFEC5

"Everyone thinks of changing the world, but no one thinks of changing himself.” - Leo Tolstoy
Matthew Kerwin
2016-03-29 11:54:10 UTC
Permalink
Post by Panagiotis Atmatzidis
Hello,
I have a ruby script which uses a thread pool to fetch JSON objects via
http. Now, I’m using a static number of pages (see pseudo-code below) but I
need to introduce a condition and stop the process when that condition is
met (e.g. request returns a 404).
​Much depends on which "process" you're stopping. More below.
Post by Panagiotis Atmatzidis
1.upto(100) do |n|
threads << Thread.new { ... }
end
threads.join()
What I don’t understand is how to apply a sort of break when a condition
x = true
while x
threads << Thread.new {


break unless x
}
end
threads.join()
Will the above approach work? Is there a better one? How do people deal in
these situations with threads?
I gather what you want to do is: stop spawning threads when you find a 404.
The problem is, `while true; Thread.new; end` runs very fast. You'll end up
with hundreds or thousands of threads queued up before the first one can
read a response (and detect a 404.) If it was me, I'd combine the two
approaches, wrapping the thread pool inside the while-loop; something like:

x = true
while x
1.upto(100) do |n|
threads << Thread.new do
# ...fetch JSON...
x = false if status==404?
end
end
threads.join()
end

That way you only have at most 100 threads (in the pool) wasting time
returning 404s. Personally I'd turn that number way down, maybe 10 or so,
depending on your circumstances (network bandwidth and latency, available
memory, processor speed, etc.)

Incidentally, I made a gem many years ago just for this sort of situation:

require 'threadpuddle'
tp = ThreadPuddle.new 10
x = true
while x
tp.spawn do
break unless x
# ...fetch JSON...
x = false if status==404?
end
end
tp.join

Does that look like what you're trying to do?

thanks!
No worries, I hope it helps.

Cheers.
--
Matthew Kerwin
http://matthew.kerwin.net.au/
Recursive Madman
2016-03-29 14:27:11 UTC
Permalink
There are essentially two ways to "stop" a thread:
1) The thread has no more code to run (i.e. the block given to
`Thread.new` returns)
2) The thread is killed (which can happen on the outside).

it is usually a good idea to synchronize state between threads using
mutexes and condition variables.
For example. you could do the following:

m = Mutex.new
cv = ConditionVariable.new
1.upto(100) do |n|
threads << Thread.new {
loop {
status = #...
m.synchronize { cv.signal } if status == 404
}
}
end

m.synchronize {
cv.wait(m) # this will block until the `cv.signal` above is called
}
# once one thread signaled 404, let's kill all the threads
threads.each(&:kill)


This solution has one big problem: even though one thread may have
received 404, other threads could still be waiting for a (possibly
successful) response.
Killing the thread would throw away those responses. To solve this, you
could use another variable on the outside (like `done`) and change your
thread's loop like:

loop {
break if done
# ...
}

Note that this may still not do what you want. It is based on the
assumption that whenever you see 404, all other threads have at least
started processing the last potentially successful request.

I hope this helps somehow,

- rm
Post by Panagiotis Atmatzidis
Hello,
I have a ruby script which uses a thread pool to fetch JSON objects
via http. Now, I’m using a static number of pages (see pseudo-code
below) but I need to introduce a condition and stop the process when
that condition is met (e.g. request returns a 404).
1.upto(100) do |n|
threads << Thread.new { ... }
end
threads.join()
What I don’t understand is how to apply a sort of break when a
x = true
while x
threads << Thread.new {
…
break unless x
}
end
threads.join()
Will the above approach work? Is there a better one? How do people
deal in these situations with threads?
thanks!
Panagiotis (atmosx) Atmatzidis
URL:http://www.convalesco.org
GnuPG ID: 0x1A7BFEC5
gpg --keyserver pgp.mit.edu --recv-keys 1A7BFEC5
"Everyone thinks of changing the world, but no one thinks of changing
himself.” - Leo Tolstoy
<http://lists.ruby-lang.org/cgi-bin/mailman/options/ruby-talk>
daniele cantarini
2016-03-29 14:48:09 UTC
Permalink
Use Fibers,
they are perfect for this use
Post by Panagiotis Atmatzidis
Hello,
I have a ruby script which uses a thread pool to fetch JSON objects via
http. Now, I’m using a static number of pages (see pseudo-code below) but I
need to introduce a condition and stop the process when that condition is
met (e.g. request returns a 404).
1.upto(100) do |n|
threads << Thread.new { ... }
end
threads.join()
What I don’t understand is how to apply a sort of break when a condition
x = true
while x
threads << Thread.new {


break unless x
}
end
threads.join()
Will the above approach work? Is there a better one? How do people deal in
these situations with threads?
thanks!
Panagiotis (atmosx) Atmatzidis
URL: http://www.convalesco.org
GnuPG ID: 0x1A7BFEC5
gpg --keyserver pgp.mit.edu --recv-keys 1A7BFEC5
"Everyone thinks of changing the world, but no one thinks of changing
himself.” - Leo Tolstoy
<http://lists.ruby-lang.org/cgi-bin/mailman/options/ruby-talk>
Robert Klemme
2016-03-29 16:38:54 UTC
Permalink
On Tue, Mar 29, 2016 at 1:15 PM, Panagiotis Atmatzidis
Post by Panagiotis Atmatzidis
Hello,
I have a ruby script which uses a thread pool to fetch JSON objects via
http. Now, I’m using a static number of pages (see pseudo-code below) but I
need to introduce a condition and stop the process when that condition is
met (e.g. request returns a 404).
1.upto(100) do |n|
threads << Thread.new { ... }
end
threads.join()
This does not work (assuming threads is an Array). Array#join does
something completely different from Thread#join!
Post by Panagiotis Atmatzidis
What I don’t understand is how to apply a sort of break when a condition is
We'd first need to know where that condition occurs and what sort of
processing you want to do. The usual approach is: create a thread pool
with a fixed # of threads, each pulling from a queue. If you want to
shut down the pool you send as many special tokens into the pool as
there are threads and every thread stops when it reads that.

The main thread usually joins on all threads subsequently.
Post by Panagiotis Atmatzidis
x = true
while x
threads << Thread.new {

break unless x
}
end
threads.join()
Will the above approach work?
Unlikely.
Post by Panagiotis Atmatzidis
Is there a better one? How do people deal in
these situations with threads?
See above. Example:

# start thread pool
queue = SizedQueue.new 1000
# unbounded:
# queue = Queue.new

threads = (1..100).map do
Thread.new do
while (item = queue.deq) != :terminate
# process item
p item
end
end
end

# put work in queue
queue.enq "some work"

# trigger termination, could also be done from any other thread
threads.each { queue.enq :terminate }

# wait for termination
threads.each(&:join)

Kind regards

robert
--
[guy, jim, charlie].each {|him| remember.him do |as, often| as.you_can
- without end}
http://blog.rubybestpractices.com/

Unsubscribe: <mailto:ruby-talk-***@ruby-lang.org?subject=unsubscribe>
<http://lists.ruby-lang.org/cgi-bin/ma
Panagiotis Atmatzidis
2016-03-29 20:27:34 UTC
Permalink
Hello,

I am using ruby-thread[1] for the pool. Here is the code:


def collect
items = []
pages = 59
pool = Thread.pool(10)
mutex = Mutex.new
1.upto(pages) do |n|
pool.process {
uri = URI(@uri)
params = {
:page => n,
:page_size => 1000
}
uri.query = URI.encode_www_form(params)
puts "Fetching from #{uri}"
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
request = Net::HTTP::Get.new(uri.request_uri)
request.basic_auth(@user, @password)
begin
response = http.request(request)
response.is_a?(Net::HTTPSuccess) ? data = JSON.parse(response.body) : data = {}
rescue SocketError
@log.info <http://log.info/>(“Something happened on the way to heaven: #{e}"
end
begin
data.each do |d|
# ...mangle...
mutex.synchronize {
items.push(p) # avoid race condition
}
end
rescue => e
puts "Trouble parsing event: #{e}"
end
}
end
pool.shutdown
File.open('sample-mutex.txt', 'w') {|f| f.write(items)}
end

The above version works but it’s not dynamic, depends on the number of pages. I’ve tried implementing “break" as suggested by M. Kerwin in various ways without success.

The condition I want to meet is: data.key?(“next”) # => false - When this condition is met, I’d like to have to stop creating new threads while allowing current threads to finish their work.


Is it true the Fibers are better suited for this work as Daniele suggested?

thanks

ps. For better readability you can view the code on github: https://gist.github.com/atmosx/9afd96911515fc60ee4e <https://gist.github.com/atmosx/9afd96911515fc60ee4e>

[1] https://github.com/meh/ruby-thread <https://github.com/meh/ruby-thread>


Panagiotis (atmosx) Atmatzidis

email: ***@convalesco.org
URL: http://www.convalesco.org
GnuPG ID: 0x1A7BFEC5
gpg --keyserver pgp.mit.edu --recv-keys 1A7BFEC5

"Everyone thinks of changing the world, but no one thinks of changing himself.” - Leo Tolstoy
Robert Klemme
2016-03-30 16:29:20 UTC
Permalink
On Tue, Mar 29, 2016 at 10:27 PM, Panagiotis Atmatzidis
Post by Panagiotis Atmatzidis
Hello,
def collect
items = []
pages = 59
pool = Thread.pool(10)
mutex = Mutex.new
1.upto(pages) do |n|
pool.process {
params = {
:page => n,
:page_size => 1000
}
uri.query = URI.encode_www_form(params)
puts "Fetching from #{uri}"
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
request = Net::HTTP::Get.new(uri.request_uri)
begin
response = http.request(request)
response.is_a?(Net::HTTPSuccess) ? data = JSON.parse(response.body) : data = {}
rescue SocketError
@log.info(“Something happened on the way to heaven: #{e}"
end
begin
data.each do |d|
# ...mangle...
mutex.synchronize {
items.push(p) # avoid race condition
Where does p come from?

And why do you hold the mutex only for each chunk of data that you
fetch from a URL? This will lead to content of different pages to be
mixed in items. Is this what you want to do?
Post by Panagiotis Atmatzidis
}
end
rescue => e
puts "Trouble parsing event: #{e}"
end
}
end
pool.shutdown
File.open('sample-mutex.txt', 'w') {|f| f.write(items)}
end
The above version works but it’s not dynamic, depends on the number of
pages. I’ve tried implementing “break" as suggested by M. Kerwin in various
ways without success.
The condition I want to meet is: data.key?(“next”) # => false - When this
condition is met, I’d like to have to stop creating new threads while
allowing current threads to finish their work.
First of all, most likely you are not creating threads. They will be
created by the pool upfront or on demand - you cannot influence that
other than by shutting down the pool.

The only thing you could stop doing is place work in the pools inbound
queue. But this is unlikely to work the way you imagine because the
work is put in the queue _initially_ and most likely all the 59 tasks
are placed into the queue before even data comes in from fetching
those URLs. So by the time you detect that it's enough work all the
work is scheduled and will be processed - unless you somehow
immediately terminate the processing (e.g. with #shutdown!) which you
don't.

But if the number of URLs to process depends on the data downloaded
from those URLs then maybe doing this in parallel is not such a good
option.
Post by Panagiotis Atmatzidis
Is it true the Fibers are better suited for this work as Daniele suggested?
I don't know. First you should step back and tell us what you are
trying to achieve and what the purpose of all this is. Until then it's
really a lot guesswork and anything we can tell you based on guesswork
might as well be wrong.

There are a few things apart from those mentioned above:
- Your method does not have inputs, instead it uses instance variables.
- The method does not return anything meaningful. In terms of
modularity I'd rather have it return items and do the writing
elsewhere. This also helps with testing.
- For items you could use an instance of Queue instead of an Array plus a Mutex.
- If you really always need to write what you download you could push
results into another queue with a single thread which reads and
immediately writs to "sample-mutex.txt".

Cheers

robert
--
[guy, jim, charlie].each {|him| remember.him do |as, often| as.you_can
- without end}
http://blog.rubybestpractices.com/

Unsubscribe: <mailto:ruby-talk-***@ruby-lang.org?subject=unsubscribe>
<http://lists.ruby-lang.org/cgi-bin/mailman/options/ruby-talk
Panagiotis Atmatzidis
2016-03-30 17:48:09 UTC
Permalink
This post might be inappropriate. Click to display it.
Continue reading on narkive:
Loading...