LevSelector.com New York
home > Multi-Threading, Shared Memory, IPC, etc.

Multi-Threading, Shared Memory, IPC, etc. (page under construction - still you may find it very useful.)
On This Page Other

- intro
- threads in java
- more_about_threads
- threads_in_perl
- cpp_threads

-
-
-


Intro ------------------------------

This page contains some info about concurrent programming: multi-threading, multi-processing, shared memory, semaphores, IPC (Inter-Process Communication). One application of these techniques is trading systems, especially high-frequency/low latency algorithmic trading systems.

Grid / Cloud computing

Random notes
multi-processing vs multi-threading ..
Critical section A piece of code that accesses (reads/writes to) a shared resource (memory) that must not be concurrently accessed by more than one thread (think of transaction locking in databases). Some synchronization mechanism is required at the entry and exit of the critical section to ensure exclusive use.
Application-level Critical Sections #include <pthread.h>
static pthread_mutex_t cs_mutex = PTHREAD_MUTEX_INITIALIZER;

void f() {
pthread_mutex_lock( &cs_mutex );
/* Do some thread-safe processing! */
pthread_mutex_unlock( &cs_mutex );
}
Kernel Level Critical Sections Kernel Level Critical Sections are the base of the software lockout issue, that is when CPU spends idle time in kernel-level critical sections.
(when one process in c.s. makes others to wait).
Solution - keeping critical sections as small and short as possible.
In a multiprocessor system, most of the conflicts are kernel-level conflicts, due to the access to the kernel level critical sections, and thus the idle wait periods generated by them have a major impact in performance degradation. This idle wait time decrease the average number of idle processors and thus scalability and relative efficiency.
mutex Mutual Exclusion algorithm (also object which is used in negotiation of execution between threads)

Random Examples:

fine-grained flags,
counters or queues
Hardware solutions:
- disable interrupts
- test-and-set of a flag (spinlock / busy-wait)
Similar atomic multiple-operation instructions, e.g., compare-and-swap,
are commonly used for lock-free manipulation of linked lists and other data structures.
Software solutions:
- Dekker's algorithm (set intention flag - and wait for other intention flags to fall)
guarantees mutual exclusion, freedom from deadlock, and freedom from starvation
limited to 2 processes, uses "Busy waiting".
- Peterson's algorithm -
- Lamport's bakery algorithm (uses numbering: sequential + thread Id)
- The black-white bakery algorithm
- Semaphores (lock/unlock, or a counting semaphore)
- Monitors - object intended to be used safely by more than one thread
(methods are executed with mutual exclusion).
waiting, signaling (notify)
Monitor can implement a simple semaphore, etc.
- Message passing
- Tuple space - hash (keys/values) accessed concurrently
Object Spaces (shared objects)
Java spaces (service - providiung distributed objects)
Shared memory communication
Message passing communication

Threads in Java home - top of the page -

To start and run a thread you usually use 2 objects:
   - one of your own class (make sure it has a run( ) method - that means it implements Runnable interface)
   - and one of type Thread (it has start( ) method)

You can use just one object if it extends Thread (because Thread has both start( ) and run( ) methods)

Here are the steps with 2 objects:
Step 1: You create a class (OurClass) extending whatever you like - and make sure it has a run( ) method.
Step 2: You create an instance of this class
Step3: You create an instance of a Thread class (with your runnable object as a parameter).
Step 4: You call start( ) method of this instance.

For example:
 
import java.io.IOException; // this is for System.in.read( )

public class OurClass extends Object implements Runnable {
  static int cc=0;
  String ThreadName;

  public t1(String name) {
    ThreadName = name;
    System.out.println("Constructor " + name);
  }

  public void run(){
   while(++cc <= 10) {
     System.out.println("Run " + ThreadName + "  " + cc);
     // try to run as is, or try to uncomment one of the following 2 lines
     // try { Thread.sleep(500); } catch (InterruptedException e) {} 
     // Thread.yield(); 
   }
   System.out.print("Finished, press ENTER to exit:");
   try { System.in.read(); } catch (IOException e) {}
  }

  public static void main(String[] args) {
    (new Thread(new OurClass("first "))).start();
    (new Thread(new OurClass("second"))).start();
  }
  
}

more about threads


class java.lang.Thread - extends Object implements Runnable
Every thread has a priority (1..10) = static int MIN_PRIORITY, NORM_PRIORITY, MAX_PRIORITY
The new thread has its priority initially set equal to the priority of the creating thread.
Each thread may or may not also be marked as a daemon.

Constructors:
Thread() 
Thread(Runnable target) 
Thread(Runnable target, String name) 
Thread(String name) 
Thread(ThreadGroup group, Runnable target) 
Thread(ThreadGroup group, Runnable target, String name) 
Thread(ThreadGroup group, String name) 

Methods:
  static int activeCount() - Returns the current number of active threads in this thread's thread group. 
  void checkAccess() - Determines if the currently running thread has permission to modify this thread. 
  int countStackFrames() - Deprecated. The definition of this call depends on suspend(), which is deprecated. Further, the results of this call were never well-defined. 
  static Thread currentThread() - Returns a reference to the currently executing thread object. 
  void destroy() - Destroys this thread, without any cleanup. 
  static void dumpStack() - Prints a stack trace of the current thread. 
  static int enumerate(Thread[] tarray) - Copies into the specified array every active thread in this thread's thread group and its subgroups. 
  ClassLoader getContextClassLoader() - Returns the context ClassLoader for this Thread. 
  String getName() - Returns this thread's name. 
  int getPriority() - Returns this thread's priority. 
  ThreadGroup getThreadGroup() - Returns the thread group to which this thread belongs. 
  void interrupt() - Interrupts this thread. 
  static boolean interrupted() - Tests whether the current thread has been interrupted. 
  boolean isAlive() - Tests if this thread is alive. 
  boolean isDaemon() - Tests if this thread is a daemon thread. 
  boolean isInterrupted() - Tests whether this thread has been interrupted. 
  void join() - Waits for this thread to die. 
  void join(long millis) - Waits at most millis milliseconds for this thread to die. 
  void join(long millis, int nanos) - Waits at most millis milliseconds plus nanos nanoseconds for this thread to die. 
  void resume() - Deprecated. This method exists solely for use with suspend(), which has been deprecated because it is deadlock-prone. 
  void run() - If this thread was constructed using a separate Runnable run object, then that Runnable object's run method is called; otherwise, this method does nothing and returns. 
  void setContextClassLoader(ClassLoader cl) - Sets the context ClassLoader for this Thread. 
  void setDaemon(boolean on) - Marks this thread as either a daemon thread or a user thread. 
  void setName(String name) - Changes the name of this thread to be equal to the argument name. 
  void setPriority(int newPriority) - Changes the priority of this thread. 
  static void sleep(long millis) - Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds. 
 static void sleep(long millis, int nanos) - Causes the currently executing thread to sleep (cease execution) for the specified number of milliseconds plus the specified number of nanoseconds. 
  void start() - Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread. 
  void stop() - Deprecated as unsafe.
  void stop(Throwable obj) - Deprecated as unsafe.
  void suspend() - Deprecated as deadlock-prone.
  String toString() - Returns a string representation of this thread, including the thread's name, priority, and thread group. 
 static void yield() - Causes the currently executing thread object to temporarily pause and allow other threads to execute. 

Methods inherited from class java.lang.Object: 
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait

Threaded Program Models:

Boss/Worker - one boss thread and several 'worker' threads. This model is common in GUI and server programs, where main thread (boss) waits for some event and then passes that event to the appropriate worker threads for processing.

Work Crew - several threads work in parallel. This is usually used to distribute the load across different processors (or in rendering engines).

Pipeline (assembly line) - the task is divided into steps. Threads are assigned to different steps. Each thread does one thing to each piece of data and passes the results to the next thread in line. Threads can run in parallel - you can distribute tasks between different CPUs for greater performance.

Threads in Perl (from docs by Dan Sugalski):

home - top of the page -

- Starting threads:

use threads;

$thr = threads->new(\&sub1, @params1);
$thr = threads->new(\&sub2, @params2);

sub sub1 {
my @params = @_;
print "In the thread, sub1, params: ", join(", ", @params), "\n";
}

sub sub2 {
my @params = @_;
print "In the thread, sub2, params: ", join(", ", @params), "\n";
}

- Waiting For A Thread To Exit

use threads;

$thr = threads->new(\&sub1);
@ReturnData = $thr->join;
print "Thread returned @ReturnData";

sub sub1 { return "Fifty-six", "foo", 2; }

- Detach a thread and forget about it - let it run by itself

use threads;

$thr = threads->new(\&sub1);
$thr->detach;
sub sub1 { ... }

- Sharing data between threads (by default no data is shared, you should explicitly indicate what is shared)

use threads;
use threads::shared;
my $foo : shared = 1;
my $bar = 1;

threads->new(sub { $foo++; $bar++ })->join;

print "$foo\n"; #prints 2 since $foo is shared
print "$bar\n"; #prints 1 since $bar is not shared

- sharing arrays & hashes

use threads;
use threads::shared;
my $var = 1;
my $svar : shared = 2;
my %hash : shared;

... create some threads here ...

$hash{a} = 1; # all threads see exists($hash{a}) and $hash{a} == 1
$hash{a} = $var # okay - copy-by-value: same effect as previous
$hash{a} = $svar # okay - copy-by-value: same effect as previous
$hash{a} = \$svar # okay - a reference to a shared variable
$hash{a} = \$var # This will die - can't store unshared data in shared hash
delete $hash{a} # okay - all threads will see !exists($hash{a})

- race condition

use threads;
use threads::shared;
my $a : shared = 1;

$thr1 = threads->new(\&sub1);
$thr2 = threads->new(\&sub2);
$thr1->join;
$thr2->join;
print "$a\n";
sub sub1 { my $foo = $a; $a = $foo + 1; }
sub sub2 { my $bar = $a; $a = $bar + 1; }

__END__
value of $a may be 2 or 3. This depends on which thread executes first (which depends on scheduling and thread implementation).
Race conditions are caused by unsynchronized access to shared data. Even $a += 5 or $a++ are not guaranteed to be atomic.
Here is another example:

use threads;
my $a : shared = 2;
my $b : shared;
my $c : shared;
my $thr1 = threads->create(sub { $b = $a; $a = $b + 1; });
my $thr2 = threads->create(sub { $c = $a; $a = $c + 1; });
$thr1->join;
$thr2->join;

__END__
Both threads have access to $a.
Threads can be executed in any order and can be interrupted at any point.
At the end, $a could be 3 or 4, and both $b and $c could be 2 or 3.

Synchronization and control

- lock() - puts a lock on shared variable (scalar, array, hash). Can be unlocked only by the same thread - or when the thread exists the outermost block that contains lock() function.

use threads;
use threads::shared;
my $total : shared = 0;

sub calc {
  for (;;) {
    my $result;
    # (... do some calculations and set $result ...)
    {
      lock($total); # block until we obtain the lock
      $total += $result;
    } # lock implicitly released at end of scope
    last if $result == 0;
  }
}

my $thr1 = threads->new(\&calc);
my $thr2 = threads->new(\&calc);
my $thr3 = threads->new(\&calc);
$thr1->join;
$thr2->join;
$thr3->join;
print "total=$total\n";

Note: Locking an array will not block subsequent locks on array elements, just lock attempts on the array itself.
Note: Tthere is no unlock() function - the only way to unlock a variable is to allow it to go out of scope.
Note: Locks are recursive, which means it's okay for a thread to lock a variable more than once. The lock will last until the outermost lock() on the variable goes out of scope. For example:

my $x : shared;
doit();
sub doit {
   {
      {  lock($x); # wait for lock
          lock($x); # NOOP - we already have the lock
         {  lock($x); # NOOP
             {  lock($x); # NOOP
                 lockit_some_more();
              }
         }
      } # *** implicit unlock here ***
   }
}

sub lockit_some_more {
  lock($x); # NOOP
} # nothing happens here


A lock can be used to guard the data in the variable. You can also have this variable created for the sole purpose of being locked - to serve as a basic semaphore.

A Thread Pitfall: Deadlocks

use threads;
my $a : shared = 4;
my $b : shared = "foo";
my $thr1 = threads->new(sub { lock($a); sleep 20; lock($b); });
my $thr2 = threads->new(sub { lock($b); sleep 20; lock($a); });

This program will probably hang until you kill it, because 2 threads will lock $a and $b respectingly - and will wait for each other to release the lock forever. This condition is called a deadlock, and it occurs whenever two or more threads are trying to get locks on resources that the others own.
One of the ways to handle this problem is to always have all threads acquire locks in the exact same order. If, for example, you lock variables $a, $b, and $c, always lock $a before $b, and $b before $c. It's also best to hold on to locks for as short a period of time to minimize the risks of deadlock.

Queues: Passing Data Around

A queue is a special thread-safe object that lets you put data in one end and take it out the other without having to worry about synchronization issues. They're pretty straightforward, and look like this:

use threads;
use Thread::Queue;
my $DataQueue = Thread::Queue->new;
$thr = threads->new(sub { while ($DataElement = $DataQueue->dequeue) { print "Popped $DataElement off the queue\n"; } });
$DataQueue->enqueue(12);
$DataQueue->enqueue("A", "B", "C");
$DataQueue->enqueue(\$thr);
sleep 10;
$DataQueue->enqueue(undef);
$thr->join;

You create the queue with new Thread::Queue . Then you can add lists of scalars onto the end with enqueue(), and pop scalars off the front of it with dequeue(). A queue has no fixed size, and can grow as needed to hold everything pushed on to it. If a queue is empty, dequeue() blocks until another thread enqueues something. This makes queues ideal for event loops and other communications between threads.

Semaphores: Synchronizing Data Access

Semaphores - object which behaves like a lock. There are basic (up/down = lock/unlock = 0/1) semaphores, and advanced (acting more like a counter).

Basic semaphores - set it to 0 or 1 using two methods: down() and up().
Calls to down() will block if the semaphore's current count would decrement below zero.
Example:

use threads;
use Thread::Semaphore; my $semaphore = new Thread::Semaphore;
my $GlobalVariable : shared = 0; $thr1 = new threads \&sample_sub, 1;
$thr2 = new threads \&sample_sub, 2;
$thr3 = new threads \&sample_sub, 3;

sub sample_sub {
   my $SubNumber = shift @_;
   my $TryCount = 10;
   my $LocalCopy;
   sleep 1;
   while ($TryCount--) {
      $semaphore->down;
      $LocalCopy = $GlobalVariable;
      print "$TryCount tries left for sub $SubNumber (\$GlobalVariable is  $GlobalVariable)\n";
      sleep 2;
      $LocalCopy++;
      $GlobalVariable = $LocalCopy;
      $semaphore->up;
   }
}

$thr1->join;
$thr2->join;
$thr3->join;

__END__
The three invocations of the subroutine all operate in sync.
The semaphore, though, makes sure that only one thread is accessing the global variable at once.

Advanced Semaphores - the same Semaphore object, but we use its ability to work as a counter by passing a value to up() and down() methods, for example: up(5).
Semaphores with counters greater than one can be useful for establishing quotas. For example, you can use a semaphore initialized to the maximum number of concurrent I/O requests (or open files) that you don't want to be exceeded.
Another example of meamingful use of inc/dec > 1 can be a GUI driven program.

Several Thread Utility Routines

# Loop through all the threads

foreach $thr (threads->list) {
   # Don't join the main thread or ourselves
   if ($thr->tid && !threads::equal($thr, threads->self)) { $thr->join; }
}

Note: when the main perl ends and there are still some threads running - the perl will warn and die, because it can't clean up itself while other threads are running.

Different implementations of threads

  • User-mode threads - live entirely within a program. The big disadvantage is that, since the OS knows nothing about threads, if one thread blocks they all do. Typical blocking activities include most system calls, most I/O, and things like sleep().
  • Kernel threads - the OS knows about kernel threads, and makes allowances for them. The main difference between a kernel thread and a user-mode thread is blocking. With kernel threads, things that block a single thread don't block other threads. Threads that block performing I/O, for example, won't block threads that are doing other things. Each process still has only one thread running at once, though, regardless of how many CPUs a system might have. Since kernel threading can interrupt a thread at any time, they will uncover some of the implicit locking assumptions you may make in your program. For example, something as simple as $a = $a + 2 can behave unpredictably with kernel threads if $a is visible to other threads, as another thread may have changed $a between the time it was fetched on the right hand side and the time the new value is stored.
  • Multiprocessor kernel threads - the OS may schedule two or more threads to run simultaneously on different CPUs. This can give a serious performance boost.

Note that different OSes (and different thread implementations for a particular OS) allocate CPU cycles to threads in different ways.

  • In "Cooperative multitasking" systems one thread can starve all the others for CPU time if it so chooses. A thread gives up control if it calls a yield function, or does something that would cause it to block, such as perform I/O.
  • In "Preemptive multitasking" systems (modern systems) one thread usually won't monopolize the CPU, because system interrupts threads at regular intervals and decides which thread should run next.
  • On some systems, there can be cooperative and preemptive threads running simultaneously. For example, threads running with realtime priorities often behave cooperatively, while threads running at normal priorities behave preemptively.
-

Performance considerations - Thread creation can be quite expensive, because a complete copy of all the variables and data of the parent thread has to be taken. So it is better to use a relatively small number of long-lived threads, all created fairly early on - before the base thread has accumulated too much data. Also note that under the current implementation, shared variables use a little more memory and are a little slower than ordinary variables.

Process-scope Changes - One thread calls chdir(), - and the working directory of all the threads changes. chroot() - causes the root directory of all the threads change, and no thread can undo it. Further examples of process-scope changes include umask() and changing uids/gids.

Dont' even t hinking of mixing fork() and threads? Why? Example, some UNIX systems copy all the current threads into the child process, while others only copy the thread that called fork(). You have been warned!

Mixing signals and threads should not be attempted. Implementations are platform-dependent, and even the POSIX semantics may not be what you expect (and Perl doesn't even give you the full POSIX API).

Thread-Safety of System Libraries - Examples of calls which are not thread-safe: localtime(), gmtime(), get{gr,host,net,proto,serv,pw}*(), readdir(), rand(), and srand() -- in general, calls that depend on some global external state. If OS has thread-safe variants of such calls, they will/may be used. Beyond that, Perl is at the mercy of the thread-safety or -unsafety of the calls. Please consult your C library call documentation. On some platforms the thread-safe library interfaces may fail if the result buffer is too small. You can recompile Perl with PERL_REENTRANT_MAXSIZE defined to the maximum number of bytes you will allow.

C++ threads

C++ threads

#include <pthread.h>
#include <cstdio>
#include <cstdlib>
#define NUM_THREADS     5
// to compile:    g++ -lpthread test_pthreads.cpp

void *PrintHello(void *threadid) {
   long tid;
   tid = (long)threadid;
   printf("Hello World! It's me, thread #%ld!\n", tid);
   pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
  pthread_t threads[NUM_THREADS];
  int rc;
  long t;
  for(t=0; t<NUM_THREADS; t++){
    printf("In main: creating thread %ld\n", t);
    rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t);
    if (rc){
      printf("ERROR\n");
      printf("ERROR; return code from pthread_create() is %d\n", rc);
      exit(-1);
    }
  }
  pthread_exit(NULL);
}

.