小林 未知数の備忘録
Fortran

Fortranメモ

  • 文字と数値の変換

    文字→数値
                    character(len=2) :: ca
                    integer :: a
                    ca='10'
                    read(ca,*) a
    数値→文字
                    character(len=2) :: ca
                    integer :: a
                    a=10
                    write(ca,*) a
  • 文字数を指定せずに (スペース無しで) 文字列を書き出す

                    character(len=100) :: ca
                    ca='abcdefg'
                    write(*,'(a)') trim(ca)
  • 桁数を指定せずに (スペース無しで) 整数を書き出す

                    integer :: i
                    i=10000
                    write(*,'(i0)') i
  • 整数を書き出す際にゼロ埋めをする

    7桁で表示し6桁までゼロ埋め表示する
                    integer :: i
                    i=12345
                    write(*,'(i7.6)') i
  • 自動で文字数を変更する

    allocatableを付けると文字列が変更されたときに文字数も自動的に変更される
                    character(:),allocatable :: ca
                    ca='abcdefg'
                    write(*,*) len(ca)
                    ca='abcdefghijklmn'
                    write(*,*) len(ca)
  • 一度closeしたファイルに書き足す

    open文でposition='append'とする。ただしformattedの場合のみ有効
  • 数値の最大値と最小値

  • 最小最大有効数字
    integer(1)-128127
    integer(2)-3276832767
    integer(4)-21474836482147483648
    integer(8)-92233729368547758089223372936854775808
    real(4)1.17549435e-383.40282347e388
    real(8)2.2250738585072013d-3081.7976931348623158d30816
    符号付き整数から符号なし1バイト整数への変換にはcharを用いれば良い。
  • デバッグオプション

    最適化を一切行わない
                    ifort -O0 hoge.f90
    配列の領域外使用など、メモリの不正使用などによるsegmentation errorのチェック
                    ifort -CB -traceback hoge.f90
    ゼロ割や値のオーバーフローのチェック
                    ifort -fpe0 -traceback hoge.f90
    上記のオプションをつけて実行すると詳しいエラーメッセージが表示されて計算が止まる。
  • 最適化オプション

    自動ベクトル化。
                    ifort -xHost hoge.f90

    サブルーチンの呼び出し部分をすべて埋め込みながらコンパイル。
                    ifort -ipo hoge.f90
    サブルーチンが非常に多い場合、若干速くなることが望める(注意:-ipoオプションはあまりおすすめしない。サブルーチンの実行の順番が内部で変わり、望む結果が得られない場合がある。確かに-ipoで速くなるときがあるが、-ipoのバグに悩まされて余計な時間を使うよりは諦めたほうが良いと思われる)
    演算の最適化
                    ifort -O1 hoge.f90
                    ifort -O2 hoge.f90
                    ifort -O3 hoge.f90
    のどれか。O1→O2→O3の順に最適化レベルは上がるが、O3がO2よりも必ず早くなるとは限らない(どれだけ良いプログラムを書いているかに依存する)。またプログラムの書き方によっては最適化レベルを上げることによるバグが生じることがある。デフォルトはO2
  • 割り算の高速化

                    ifort -no-prec-div hoge.f90
    加減乗除の中では割り算が最も時間がかかる。そこで割り算だけ精度を下げて高速化する(できるだけ割り算は減らした方がいい。例えば2で割る代わりに0.5を掛けるとか、整数であればビットシフトを用いるとか)。
  • Fortranの場合2次元以上の配列を用意したとき、実際に用意されるメモリの順番は内側が先である。例えば

                    real(8) :: a(3,3)
    という3 x 3の2次元配列を定義すると、確保される実際のメモリの順番は
                    a(1,1)
                    a(2,1)
                    a(3,1)
                    a(1,2)
                    a(2,2)
                    a(3,2)
                    a(1,3)
                    a(2,3)
                    a(3,3)
    となる(juliaと同じで、c/c++、java、pythonと逆)。do文を用いて配列を操作するようなことがあった場合、メモリの順番に従って操作するようにした方が速い。例えば
                    do i=1,3
                      do j=1,3
                        a(i,j)=i+j
                      enddo
                    enddo
                    do j=1,3
                      do i=1,3
                        a(i,j)=i+j
                      enddo
                    enddo
    は行なっている演算は同じであるが、後者はメモリの順番に従って操作しているので前者よりも速くなる。
  • 階段関数\( \theta(x) \)

                    0.5d0*(dsign(1.d0,x)+1.d0)
    ただし、if文を使った方が速い場合も多々ある。一行でスッキリ書けるというメリットくらいかも。
  • 組み込み関数atan2(y,x)=atan(y/x)

    atan2(x,y)でないので注意 (gnuplotと同じでMathematicaと逆)
  • データの書き出し・読み取りは案外、数値計算のボトルネックになっていることが多いので、ここを改善すると驚くほど計算が速くなったりする。例えば配列データの書き出しは

                    open(10,file='hoge.dat')
                    do j=1,3
                      do i=1,3
                        write(10,*) a(i,j)
                      enddo
                    enddo
                    close(10)
    のようにデータを1つずつ書き出すのではなく
                    open(10,file='hoge.dat')
                    write(10,*) ((a(i,j),i=1,3),j=1,3)
                    close(10)
    のようにまとめて書き出せるのであれば書き出すと速い。
  • データファイルには人間が読める形式(ascii : アスキー)と、PCは読めるが(厳しい修行に耐え抜いた人間以外の)人間には読めない形式(binary : バイナリ)があり、人間が読む必要がないのであれば(例 : 別のプログラムへデータを引き渡すだけ)、バイナリ形式でデータを書き出すと、書き出す時間・ファイルサイズともに驚くほど小さくなる(読み込みもバイナリの方が圧倒的に速い)。ちなみにAVS/ExpressやPythonのNumpyはバイナリ形式のデータファイルを読むことができる。ある程度修行を積めばgnuplotにもバイナリ形式のデータファイルを読み込ませることができるらしい。また、Paraviewはバイナリデータを読み込ませる場合にアスキーを混ぜるという変則技が必要。バイナリ形式でデータを書き出す例

                    open(10,file='hoge',form='binary')
                    write(10) ((a(i,j),i=1,3),j=1,3)
                    close(10)
    とすれば良い。拡張子は付けないか、付けるとすればdatとは別の拡張子(例えばbin?)を付けた方が良い。
  • unformatted,binaryに関して

    • unformatted : access="sequential" (デフォルト) と併用すると、write文でデータを書き出す度に4バイトのヘッダとフッタが入る。何だかformattedのときにwrite文を書く度に改行コードが入るのと似ている。まるで使い物にならない。

    • ちなみにformattedで改行コードが入らないようにするには

                          write(10,*,advance='no')
      としておく。 binary : write文でヘッダもフッタも入らないので使いやすい。ただしIntelコンパイラ以外には実装されていないみたい。
    • Intelコンパイラ以外でどうしてもunformattedを使わなければいけない場合はaccess='direct', recl=バイト数かaccess='stream'を使う。前者はreclで指定したバイト数を1グループとして

                          write(10,rec=5)	
      とすればファイルの先頭から(reclの指定数)×5バイトの場所に書き出される。グループの概念が無いような場合にはstreamを用いて
                          write(10,pos=5)	
      とすればファイルの先頭から5バイト目の場所に書き出される。ちなみにformattedの場合にはバイトではなく文字数になる (改行も含める) 。
  • 計算機は有限桁の数値しか扱えないので加減演算の間で桁落ちが生じることがある。例えば(a+b)+cとa+(b+c)は必ずしも等しくない。桁落ちが深刻になるのはaとbが近い値でa-bの演算をするときである。問題となる2例を示す。

    1つ目は分散
    \( \displaystyle \qquad \frac{1}{N} \sum (x - \langle x \rangle)^2 \)
    を計算する際に
    \( \displaystyle \qquad \frac{1}{N} \sum x^2 - \langle x \rangle^2 \)
    とすると、大きな桁落ちが生じる可能性がある。この桁落ちを回避するには前者の方法で計算するか、メモリに問題がある場合には\(\langle x \rangle\)の予想値\(x_0\)を設定し
    \( \displaystyle \qquad \frac{1}{N} \sum (x - x_0)^2 - (x_0 - \langle x \rangle)^2 \)

    とすればよい。もう1つの例は2次方程式で公式
    \( \displaystyle \qquad \frac{- b + \sqrt{b^2 - 4 a c}}{2 a}, \quad \frac{- b - \sqrt{b^2 - 4 a c}}{2 a} \)
    を使って解を求める際、\(4 a c\)の絶対値が小さいときである。この場合\(b \geq 0\)であれば前者に、\(b \leq 0\)であれば後者に大きな桁落ちが生じる。これを回避するには、桁落ちが生じる可能性のある方に分子の有理化を行って
    \( \displaystyle \qquad \frac{- 2 c}{b + \sqrt{b^2 - 4 a c}}, \quad \frac{- 2 c}{b - \sqrt{b^2 - 4 a c}} \)
    とすれば良い
  • 数値計算やファイルの読み書きでストライドは極力使わないこと。例えばフーリエ変換のライブラリを用いて複数のフーリエ変換をする場合

                    program main
                      use mkl_dfti
                      implicit none
                      type(dfti_descriptor),pointer :: plandft
                      integer,parameter :: n=1024
                      integer :: statdft
                      complex(8) :: zf(n*5)
                      statdft=dfticreatedescriptor(plandft,dfti_double,dfti_complex,1,n)
                      statdft=dftisetvalue(plandft,dfti_number_of_transforms,5)
                      statdft=dftisetvalue(plandft,dfti_input_distance,1)
                      statdft=dftisetvalue(plandft,dfti_output_distance,1)
                      statdft=dftisetvalue(plandft,dfti_input_strides,(/0,5/))
                      statdft=dftisetvalue(plandft,dfti_output_strides,(/0,5/))
                      statdft=dfticommitdescriptor(plandft)
                      statdft=dfticomputeforward(plandft,zf)
                      stop
                    end program main
    のようにストライドを使うよりも
                    program main
                      use mkl_dfti
                      implicit none
                      type(dfti_descriptor),pointer :: plandft
                      integer,parameter :: n=1024
                      integer :: statdft
                      complex(8) :: zf(n*5)
                      statdft=dfticreatedescriptor(plandft,dfti_double,dfti_complex,1,n)
                      statdft=dftisetvalue(plandft,dfti_number_of_transforms,5)
                      statdft=dftisetvalue(plandft,dfti_input_distance,n)
                      statdft=dftisetvalue(plandft,dfti_output_distance,n)
                      statdft=dfticommitdescriptor(plandft)
                      statdft=dfticomputeforward(plandft,zf)
                      stop
                    end program main
    として、フーリエ変換を行うデータをひとかたまりにしておいた方が断然速い (第2,第3次元のみフーリエ変換したいとか言う場合はストライドが必須になるが) 。AVS/Expressなどでデータを読み込む際にもストライドはなるべく使わずひとかたまりでデータを読み込むこと。

Torque-Mauiとの連携

  • OpenPBSでppnで指定した数を全て自動でOpenMP並列に割り当てる

                    export OMP_NUM_THREADS=$PBS_NUM_PPN
  • スクリプト内で指定した環境変数は-vオプションで指定できる。例えばスクリプト内で

                    #PBS -o out-$test1-$test2.log
                    ./$out.exe $test1 $test2
    などと書いておき、
                    qsub ./hoge.sh -v out=test,test1=1,test2=2
    とすれば
                    ./test.exe 1 2
    が実行され、out-1-2.logの名前のログファイルができる。

OpenMP

  • do文の並列化

    完全に独立なdo文は簡単に並列化できる
                    real(8) :: a(1:100),b(1:100),c(1:100)
                    省略
                    !$OMP PARALLEL DO
                    do i=1,100
                      a(i)=b(i)*c(i)
                    enddo
                    !$OMP END PARALLEL DO
    のような文章を書く。!$OMP PARALLEL DOから!$OMP END PARALLEL DOの間にあるdo文が並列化される。例えば4コア並列をさせた場合1<i<25は1番目、26<i<50は2番目、51<i<75は3番目、76<i<100は4番目のコアが計算を行い、最後!$OMP END PARALLEL DOにおいてそれぞれのaのデータが統合される。
                    real(8) :: a(1:100,1:100),b(1:100,1:100),c(1:100,1:100)
                    省略
                    !$OMP PARALLEL DO
                    do j=1,100
                      do i=1,100
                        a(i,j)=b(i,j)*c(i,j)
                      enddo
                    enddo
                    !$OMP END PARALLEL DO
    のようにdo文が入れ子になっている場合は外側のdo文に対して(この場合はj)、並列処理が行われる。
    注意したいのが、独立でないdo文を並列化するとおかしな結果が出ることである。例えば
                    real(8) :: a(1:100),b(1:99)
                    省略
                    !$OMP PARALLEL DO
                    do i=1,99
                      a(i+1)=a(i)+b(i)
                    enddo
                    !$OMP END PARALLEL DO
    と書くと、a(i+1)の計算に1つ前のiで計算した結果a(i)を用いているが、例えば4コア並列のときのa(26)はa(25)の結果を用いており、a(26)を計算するのは2番目のコア、a(25)は1番目のコア、と別のコアが計算しているのでa(26)は正しい結果が得られない(a(26)が正しくなければ以降は全て正しくない)。
    別の例として、有名な和の計算
                    real(8) :: s,a(1:100)
                    省略
                    s=0.d0
                    !$OMP PARALLEL DO
                    do i=1,100
                      s=s+a(i)
                    enddo
                    !$OMP END PARALLEL DO
    と書くと、異なるiの領域のa(i)の和をそれぞれのコアが独立に計算することになり(sに対するメモリ領域はそれぞれのコアが独立に用意し、共有できない)、最後異なるコアの異なるsを統合することができず、おかしな結果が得られる。この場合には下の配列式の例を用いるか、ATOMICを用いて
                    real(8) :: s,a(1:100)
                    省略
                    a=0.d0
                    !$OMP PARALLEL DO
                    do i=1,100
                      !$OMP ATOMIC
                      s=s+a(i)
                    enddo
                    !$OMP END PARALLEL DO
    とすればよい(!$OMP ATOMICを指定した次の行に対して、メモリの変更を共通で行う。速度、安定性において配列式の使用が推奨される。配列式で書けない場合のみATOMICを用いればよい)。
  • 配列式の並列化

    配列式も簡単に並列化できる。例えば
                    real(8) :: a(1:100),b(1:100),c(1:100)
                    省略
                    !$OMP WORKSHARE
                    a(1:100)=b(1:100)+c(1:100)
                    !$OMP END WORKSHARE
    のように!$OMP WORKSHAREから!$OMP END WORKSHAREの間の配列式が並列化される。配列関数も並列化できて、例えば
                    real(8) :: s,a(1:100)
                    省略
                    !$OMP WORKSHARE
                    s=sum(a(1:100))
                    !$OMP END WORKSHARE
    と書くと、それぞれのコアでaの異なる範囲の和を計算し、!$OMP END WORKSHAREにおいて計算された和の和をとることでaの和が得られる。
  • OpenMPを用いたプログラムをコンパイルするにはコンパイルオプション-openmpを用いる。

                    ifort -openmp hoge.f90
    また、実際に並列計算をさせるにはOMP_NUM_THREADSという環境変数にコア数を与えなければいけない。ジョブスクリプトの例としては
                    #!/bin/bash
                    #PBS -q batch
                    #PBS -e error.log
                    #PBS -o out.log
                    #PBS -l nodes=1:ppn=4
                    #PBS -l walltime=00:10:00:00
                    cd $PBS_O_WORKDIR
                    export OMP_NUM_THREADS=4
                    ./out.exe
    となり、4行目のppnと7行目の環境変数の部分に並列コア数を指定する。

    (注意!) ulimitと併用する場合、環境変数の指定はulimitの前でないと上手く動かない模様。つまり、
                    #!/bin/bash
                    #PBS -q batch
                    #PBS -e error.log
                    #PBS -o out.log
                    #PBS -l nodes=1:ppn=4
                    #PBS -l walltime=00:10:00:00
                    cd $PBS_O_WORKDIR
                    ulimit -s unlimited
                    export OMP_NUM_THREADS=4
                    ./out.exe
    ではダメで、
                    #!/bin/bash
                    #PBS -q batch
                    #PBS -e error.log
                    #PBS -o out.log
                    #PBS -l nodes=1:ppn=4
                    #PBS -l walltime=00:10:00:00
                    cd $PBS_O_WORKDIR
                    export OMP_NUM_THREADS=4
                    ulimit -s unlimited
                    ./out.exe
    としなければならない。
    また$PBS_NUM_PPNを用いて
                    #!/bin/bash
                    #PBS -q batch
                    #PBS -e error.log
                    #PBS -o out.log
                    #PBS -l nodes=1:ppn=4
                    #PBS -l walltime=00:10:00:00
                    cd $PBS_O_WORKDIR
                    export OMP_NUM_THREADS=$PBS_NUM_PPN
                    ./out.exe
    としてもよい。またOpenMP並列を実行したときのみセグメンテーション違反でエラーでプログラムが終了するときにはスタック領域の不足が考えられる。環境変数$OMP_STACKSIZEを用いて
                    export OMP_STACKSIZE=(スタック領域 : KB単位)
    としてスタック領域を増やしておく。プログラムサイズによるが512MB (512000) ぐらいから試してみる。
  • 乱数生成ライブラリmkl_vslのOpenMPを用いた並列化

    do文で並列化すると、1個ずつ乱数を呼び出すため、とても遅くなる。ここは複数の乱数の種を呼び出し、かつ手動で並列化する。
                    program main
                      use mkl_vsl_type
                      use mkl_vsl
                      use omp_lib
                    !
                      implicit none
                      integer,parameter :: n=1000
                      integer :: iprocs,nprocs,nprocsm
                      integer,allocatable :: nru(:),nrd(:),nrp(:)
                      integer :: statvsl,irand,cr,cm
                      type(vsl_stream_state),allocatable :: planvsl(:)
                      real(8) :: fake(1),frand(0:n-1)
                    !
                      !$OMP PARALLEL
                      nprocs=omp_get_max_threads()
                      !$OMP END PARALLEL
                    !
                      if(mod(n,nprocs)==0) then
                        nprocsm=nprocs-1
                        allocate(nru(0:nprocsm),nrd(0:nprocsm),nrp(0:nprocsm))
                        do iprocs=0,nprocsm
                          nrp(iprocs)=n/nprocs
                          nrd(iprocs)=nrp(iprocs)*iprocs
                          nru(iprocs)=nrp(iprocs)*(iprocs+1)-1
                        enddo
                      else
                        nprocsm=n/(n/nprocs+1)
                        allocate(nru(0:nprocsm),nrd(0:nprocsm),nrp(0:nprocsm))
                        do iprocs=0,nprocsm
                          nrp(iprocs)=n/nprocs+1
                          nrd(iprocs)=nrp(iprocs)*iprocs
                          nru(iprocs)=nrp(iprocs)*(iprocs+1)-1
                        enddo
                        nrp(nprocsm)=n-nrd(nprocsm)
                        nru(nprocsm)=n-1
                      endif
                    !
                      allocate(planvsl(0:nprocsm))
                      do iprocs=0,nprocsm
                        call system_clock(irand,cr,cm)
                        statvsl=vslnewstream(planvsl(iprocs),vsl_brng_mt19937,irand+iprocs)
                        statvsl=vdrnggaussian(vsl_method_dgaussian_icdf,planvsl(iprocs),1,fake,0.d0,1.d0)
                      enddo
                    !
                      !$OMP PARALLEL
                      iprocs=omp_get_thread_num()
                      if(iprocs<=nprocsm) then
                        statvsl=vdrnguniform(vsl_method_duniform_std,planvsl(iprocs),nrp(iprocs),frand(nrd(iprocs)),0.d0,1.d0)
                      !$OMP END PARALLEL
                      stop
                    end program main
    iprocs番目のプロセッサはfrand(nrd(iprocs))からfrand(nru(iprocs))までのnrp(iprocs)個の領域に乱数を生成する。このプログラムの欠点は並列化をしない場合でもきちんとOMP_NUM_THREADS=1としておかないといけないことである。

OpenMPI

  • OpenMPI並列ではすべてのコアが、同一のプログラムを実行することになる。最も簡単なOpenMPIプログラムの例を示す。

                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,nprocs,myrank
                    !
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                    !
                      write(*,*) myrank,nprocs
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main
    3行目においてMPI並列に必要な関数群を定義しているファイルを読み込んでいる。兎にも角にもこれを読み込まないとMPI並列が出来ないので注意。別のシステムでMPI並列をする場合、mpif.hのファイルの場所がシステムによって異なるので正しい場所に書きなおす必要がある。

    • ierr : エラーメッセージ格納するための整数。

    • nprocs : 全MPI並列数を格納するための整数。

    • myrank : 全MPI並列数のうち、自分が何番目なのかを格納するための整数。

    • 6行目でMPI並列が始まることを宣言する。ここから12行目のmpi_finalizeまでが並列化されることになる。

    • 7行目のMPI組み込み関数mpi_comm_sizeによって、全並列数がnprocsに格納される。mpi_comm_worldはMPIコミュニケーターと呼ばれる変数で、呪文のようなものだと思っても構わない。

    • 8行目のMPI組み込み関数mpi_comm_rankによって、全並列数のうち、自分自身が何番目なのかという番号がmyrankに格納される。つまりmyrankは計算するコアごとに異なる値が割り振られ、並列計算を行うにあたって最も重要な値である。0≤myrank≤nprocs-1である。

    • 12行目のmpi_finalizeにおいてMPI並列の終了を宣言する。すべてのコアの計算が終了した時点で次の行へと進むことになる。もしこの1行がなければ、一番最初に計算を終了したコアが13行目のstopを実行してしまい、他のコアが計算途中なのにもかかわらずプラグラムが終了してしまうことになる。したがってこの1行は非常に重要である。

    10行目でmyrankとnprocsを標準出力に書き出している。このプログラムを4並列で実行すると
                    0 4
                    1 4
                    2 4
                    3 4
    となる(表示される行の順番はバラバラかもしれない)。これは4つのコアが独立に上記のプログラムを実行し、それぞれのコアが10行目に来た時点でmyrankとnprocsを出力した結果である。MPI並列計算をさせるために書かれたプログラムをコンパイルするにはifortの代わりにmpif90(FORTRAN77で書いているのであればmpif77を用いる)を用いる。MPI並列計算をさせるためのジョブスクリプトの例として
                    #!/bin/bash
                    #PBS -q batch
                    #PBS -e error.log
                    #PBS -o out.log
                    #PBS -l nodes=4:ppn=1
                    #PBS -l walltime=00:10:00:00
                    cd $PBS_O_WORKDIR
                    export OMP_NUM_THREADS=1
                    mpirun ./out.exe
    となる。実行にはmpirunを用いること。ここでnodesとppnの違いに注意。ppnはノード内でどれだけ並列させるかというのを指定し、nodesはノード間でどれだけ並列させるかというのを指定する。例えばnodes=1:ppn=4とすると、mauiは現時点で最も速いノードを探し出し、そのノードに対して並列数4のノード内並列計算をさせる。逆にnodes=4:ppn=1とすると1並列ずつノードにジョブを割り振りながらノードの速度を随時測定する。コアの割り振りが決まった時点で、すべてのコアが同一ノードに割り振られていればノード内並列になる(つまりnodes=1:ppn=4としたときとほぼ同じになる)し、複数のノードに割り振られていれば、ノード間並列となる。全並列数=nodes x ppnとなり、例えばnodes=2:ppn=4とするとノード内4並列を各2ノードで行うような8並列の計算となる(nodes=2に対してmauiが同じノードに計算を割り当てれば、ノード内8並列になる)。ノード間並列ではLANケーブルを介したデータ通信を行うため、データ通信が多ければ多いほど並列効率は悪くなる。
  • 完全に独立なdo文の並列化

    いわゆるバカパラと呼ばれる並列計算で、つまり並列化しなくても、ジョブを個別に複数投入すれば代用できる類の並列化である。例えば異なるパラメーターで全く同じ計算をさせたい場合に
                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,nprocs,myrank
                    !
                      integer :: it
                      real(8) :: t
                      他の変数を定義
                      
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                    !
                      do it=myrank,100,nprocs
                        t=dble(it)/100.d0
                        計算
                      enddo
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main
    これはパラメータ0≤t≤1においてtを0.01刻みに動かして計算させる際に、パラメータの動かし方を並列化させたという最も簡単な並列化である。この並列計算において注意したい点は、101通りのtの計算が各コアごとに均等に割り振られるため、特にノード間並列をさせた場合に速いコアは先に終わってずっと待っており、遅いコアがずっと計算することになる。これを回避するために次のような改良が考えられる。
                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,nprocs,myrank
                      character(len=20) :: crank
                    !
                      integer :: it
                      real(8) :: t
                      他の変数を定義
                      
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                    !
                      it=myrank
                      if(myrank==0) then
                        open(10,file='next',form='binary')
                        write(10) nprocs
                        close(10)
                      endif
                      write(crank,*) myrank
                      call system('sleep '//crank)
                    !
                      do
                        t=dble(it)/100.d0
                        計算
                        open(10,file='next',form='binary',status='old')
                        read(10) it
                        close(10)
                        if(it>100) exit
                        open(10,file='next',form='binary')
                        write(10) it+1
                        close(10)
                      enddo
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main
    これは各tにおいてdo文の計算が終わったコアから次のtの計算を順番に行っていくためのプログラムである。ここでnextというファイルを作成しているが、ここには次に計算すべきitの値を書き出しておく。do文の一番最後まで来たコアは、nextを読み込んで次のitの値を知り、nextにit+1を上書きするという寸法である。nfsのファイル共有にタイムラグをもたせているようなシステムの場合、この方法は使えない(nextの更新が遅れてすでに終わっているitの計算を再度別のコアが行う可能性があるため)。nextへの書き込みおよび読み込みに対して複数のコアが集中するのを防ぐためにdo文の直前で(myrank)秒だけ計算を止め、コアごとに計算のタイミングを少しずらしている。
  • Monte Carlo計算などでアンサンブルを稼ぎたい場合に、各アンサンブルの計算を並列化させるという並列計算が考えられる。この場合は平均値等を計算するために、最後にそれぞれのコアで計算された値を足し合わせる必要がある。前の例を一歩進めて書き直せば

                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,nprocs,myrank
                      character(len=20) :: crank
                    !
                      integer :: it
                      real(8) :: t,s,st
                      他の変数を定義
                    !
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                    !
                      it=myrank
                      if(myrank==0) then
                        open(10,file='next.bin',form='binary')
                        write(10) nprocs
                        close(10)
                      endif
                      write(crank,*) myrank
                      call system('sleep '//crank)
                    !
                      s=0.d0
                      do
                        計算
                        s=s+st
                    !
                        open(10,file='next.bin',form='binary',status='old')
                        read(10) it
                        close(10)
                        if(it>100) exit
                        open(10,file='next.bin',form='binary')
                        write(10) it+1
                        close(10)
                      enddo
                      mpi_allreduce(mpi_in_place,s,1,mpi_double_precision,mpi_sum,mpi_comm_world,ierr)
                    !
                      if(myrank==0) then
                        open(11,file='average.dat',form='formatted')
                        write(11,'(e26.16e3)') s/101.d0
                        close(11)
                      endif
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main
    これは101個のアンサンブルを並列化して計算させ、アンサンブル毎の値stを各コアで足し合わせる(=s)。最後にコア毎のsをすべて足し合わせる必要があるが、ここでデータ通信を行う。そのためのコマンドがmpi_allreduceであり、これは
                    mpi_allreduce(通信したい変数,結果を格納する変数,通信したいデータ数,データの型,通信する際に行う演算,mpiコミュニケータ,エラーメッセージ)
    の形で用いる。通信したい変数と結果を格納する変数を同じにしたい場合、通信したい変数の部分をmpi_in_placeとする。こうすると各ノードのsの和を計算して各ノードのsに格納する。また通信するデータは配列であっても良い。その場合は通信したい配列の一番最初のアドレスを指定し、通信したいデータ数を指定する。データの型としてmpi_integer,mpi_double_precision,mpi_double_complexなどがある。また上の例ではデータ通信の際にそれぞれのコアのデータを足し合わせるように指定したが、その場合にはmpi_sumとなる。その他にもmpi_productのように掛けたりする演算など色々指定できる。また、配列を指定した場合は配列の要素ごとに和をとったり積をとったりする。詳しくはマニュアルを参照。
    基本的に通信を行うためのコマンドを指定すると、すべてのコアがそのコマンドの部分に来た時点で(つまり速いコアはそこで待っている)実行されるが、意図的にコアの同期(コアの足並みをそろえる)を行いたい場合には
                    mpi_barrier(mpi_comm_world,ierr)
    とする。

    独立ではないdo文の並列化 : 差分法

    差分法であれば、普段ならOpenMPで全く問題ないが、乱数生成等との兼ね合いや、メモリの節約等でどうしてもOpenMPIを使いたい場合もあるので載せておく。また、差分法はMPI並列コードの格好の練習にもなる。今、2次元格子Nx X Nyを考え、Nyを並列化することを考える。また並列数をMとしたときにNy/Mは割り切れるとしておく。
                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,nprocs,myrank
                      integer :: istatus(mpi_status_size)
                      integer :: kup,kdown
                    !
                      integer,parameter :: nx=100,ny=100
                      integer,parameter :: nxm=nx-1,nym=ny-1
                      integer :: ix,iy
                      integer :: nyi,nyf,nyr
                      real(8),allocatable :: f(:,:),fd(:,:)
                    !
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                    !
                      kup=myrank+1
                      kdown=myrank-1
                      if(myrank==0) kdown=mpi_proc_null
                      if(myrank==nprocs-1) kup=mpi_proc_null
                    !
                      if(mod(ny,nprocs)/=0) stop
                      nyr=ny/nprocs
                      nyi=myrank*nyr
                      nyf=(myrank+1)*nyr-1
                    !
                      allocate(f(-1:nx,nyi-1:nyf+1),fd(0:nxm,nyi:nyf))
                    !
                      do iy=nyi,nyf
                        do ix=0,nxm
                          f(ix,iy)=fに値を代入
                        enddo
                      enddo
                    !
                      do iy=nyi,nyf
                        f(-1,iy)=0.d0
                        f(nx,iy)=0.d0
                      enddo
                      if(myrank==0) then
                        do ix=0,nxm
                          f(ix,-1)=0.d0
                        enddo
                      else if(myrank==nprocs-1) then
                        do ix=0,nxm
                          f(ix,ny)=0.d0
                        enddo
                      endif
                    !
                      call mpi_sendrecv(f(0,nyf),nx,mpi_double_precision,kup,1,f(0,nyi-1),nx,mpi_double,precision,kdown,1,mpi_comm_world,istatus,ierr)
                      call mpi_sendrecv(f(0,nyi),nx,mpi_double_precision,kdown,1,f(0,nyf+1),nx,mpi_double,precision,kup,1,mpi_comm_world,istatus,ierr)
                    !
                      do iy=nyi,nyf
                        do ix=0,nxm
                          fd(ix,iy)=f(ix+1,iy)+f(ix-1,iy)+f(ix,iy+1)+f(ix,iy-1)-4.d0*f(ix,iy)
                        enddo
                      enddo
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main

    独立ではないdo文の並列化 : フーリエ変換

    シングルノード用のフーリエ変換のルーチンを用いて3次元フーリエ変換を並列化する。IntelのMKLならばMPI用のフーリエ変換のルーチンがあるが、マニュアルが非常に適当で、かつ動くかどうかが環境に非常に依存しており、特にMPIのバージョンなどによっては動かない仕様なので (これで本当に有用と言えるのか?) 、自前でプログラムを作ることを試みる。なおこのやり方は全く最適でなく、IntelのMKLのルーチンに比べて10%ほど余分に時間がかかった。簡単のため、空間は立方体であるとし、差分法のときと同様にNz/Mは割り切れるとしておく (Nx=Ny=Nz) 。
                    program main
                      implicit none
                      include 'mpif.h'
                      integer :: ierr,ireq,nprocs,nprocsm,myrank
                      integer :: istatus(mpi_status_size)
                    !
                      integer,parameter :: nx=512
                      integer,parameter :: nxm=nx-1
                      integer :: ix,iy,iz,iranky,irankz
                      integer :: nzi,nzf,nzr
                      integer,allocatable :: nzio(:),nzfo(:)
                      complex(8),allocatable :: zf(:,:,:),zft(:,:,:),zf1(:)
                    !
                      call mpi_init(ierr)
                      call mpi_comm_size(mpi_comm_world,nprocs,ierr)
                      call mpi_comm_rank(mpi_comm_world,myrank,ierr)
                      nprocsm=nprocs-1
                    !
                      if(mod(nx,nprocs)/=0) stop
                      nzr=nx/nprocs
                      nzi=myrank*nzr
                      nzf=(myrank+1)*nzr-1
                    !
                      allocate(nzio(0:nprocsm),nzfo(0:nprocsm))
                      do irankz=0,nprocsm
                        nzio(irankz)=irankz*nzr
                        nzfo(irankz)=(irankz+1)*nzr-1
                      enddo
                    !
                      allocate(zf(0:nxm,0:nxm,nzi:nzf),zft(0:nxm,nzi:nzf,0:nxm),zf1(0:nxm))
                    !
                      do iz=nzi,nzf
                        do iy=0,nxm
                          do ix=0,nxm
                            zf(ix,iy,iz)=zfに値を代入
                          enddo
                        enddo
                      enddo
                    !
                      do iz=nzi,nzf
                        call forwardfft2d(zf(0,0,iz)) !シングルノード用の2次元フーリエ変換
                      enddo
                    !
                      do irankz=0,nprocsm
                        do iranky=0,nprocsm
                          if(irankz==myrank) then
                            if(iranky==myrank) then
                              do iz=nzi,nzf
                                do iy=nzi,nzf
                                  do ix=0,nxm
                                    zft(ix,iy,iz)=zf(ix,iy,iz)
                                  enddo
                                enddo
                              enddo
                            else
                              do iz=nzi,nzf
                                do iy=nzio(iranky),nzfo(iranky)
                                  call mpi_isend(zf(0,iy,iz),nx,mpi_double_complex,iranky,1,mpi_comm_world,ireq,ierr)
                                  call mpi_wait(ireq,istatus,ierr)
                                enddo
                              enddo
                            endif
                          else if(iranky==myrank) then
                            do iz=nzio(irankz),nzfo(irankz)
                              do iy=nzi,nzf
                                call mpi_irecv(zft(0,iy,iz),nx,mpi_double_complex,iranky,1,mpi_comm_world,ireq,ierr)
                                call mpi_wait(ireq,istatus,ierr)
                              enddo
                            enddo
                          endif
                        enddo
                      enddo
                    !
                      do iy=nzi,nzf
                        do ix=0,nxm
                          zf1(iz)=zft(ix,iy,iz)
                        enddo
                        call forwardfft1d(zf1) !シングルノード用の1次元フーリエ変換
                        do ix=0,nxm
                          zft(ix,iy,iz)=zf1(iz)
                        enddo
                      enddo
                    !
                      do irankz=0,nprocsm
                        do iranky=0,nprocsm
                          if(irankz==myrank) then
                            if(iranky==myrank) then
                              do iz=nzi,nzf
                                do iy=nzi,nzf
                                  do ix=0,nxm
                                    zf(ix,iy,iz)=zft(ix,iy,iz)
                                  enddo
                                enddo
                              enddo
                            else
                              do iz=nzi,nzf
                                do iy=nzio(iranky),nzfo(iranky)
                                  call mpi_irecv(zf(0,iy,iz),nx,mpi_double_complex,iranky,1,mpi_comm_world,ireq,ierr)
                                  call mpi_wait(ireq,istatus,ierr)
                                enddo
                              enddo
                            endif
                          else if(iranky==myrank) then
                            do iz=nzio(irankz),nzfo(irankz)
                              do iy=nzi,nzf
                                call mpi_isend(zft(0,iy,iz),nx,mpi_double_complex,iranky,1,mpi_comm_world,ireq,ierr)
                                call mpi_wait(ireq,istatus,ierr)
                              enddo
                            enddo
                          endif
                        enddo
                      enddo
                    !
                      call mpi_finalize(ierr)
                      stop
                    end program main

    独立ではないdo文の並列化:レプリカ交換Monte Carlo

    レプリカ交換Monte Carloの並列化とは、異なるノードで異なる温度の計算を行いながら、必要に応じてノード内あるいはノード間で温度の交換を行うことである (温度の代わりに系を入れ替えると、大きなデータの転送が起こるため、絶対に行ってはならない) 。続きは後日。
  • MPIを用いた並列計算と用いない計算で、全く同じ計算をしているにも関わらず、微妙に結果が異なる場合がある (カオスのシミュレーションなんかだと、長時間発展後に全然違う結果になったりする) 。この差異の最も大きな要因の1つはmpi_reduceを用いたノード間演算にある (演算の順序が変わるため) 。例えばmpi_sumを用いた倍精度実数の和と+におけるノード内の倍精度実数の和を比較すると差が出る。これを避けたい場合は面倒だがmpi_sendrecvやmpi_gatherあたりでデータを転送してからノード内で必要な演算をすればいい。どちらが正しいという問題ではないが。

FFT

  • フーリエ変換の入出力

    \( \displaystyle \qquad B(K) = \sum_X e^{- i K X} A(X) \) \( \displaystyle \qquad A(X) = \frac{1}{N} \sum_K e^{i K X} B(K) \)

    において、波数\(K\)は\(K = 2 \pi J / N\ (0 \leq J \leq N / 2)\)および\(K = - 2 \pi (N - J) / N\ (N / 2 + 1 \leq J \leq N - 1)\)だと思って差し支えない。また、逆変換にかかるべき因子\(1 / N\)は通常はかからないので手で入れるべきである。mklのライブラリの場合
                    statdft=dftisetvalue(plandft,dfti_backward_scale,1.d0/dble(N))
    のようにスケール因子をパラメーターに組み込めるので用いること。
  • スタンダードな2次元FFTに対するモジュールの例を示す。

  • Intel Math Kernel Libraryを用いたFFT

                    module intel_fft
                      use mkl_dfti
                    !
                      implicit none
                      integer,parameter :: nx=512,ny=512
                      integer,parameter :: nxy=nx*ny
                      type(dfti_descriptor),pointer,save,private :: plandft
                      integer,save,private :: statdft
                    !
                    !
                      contains
                    !
                    !
                      subroutine intel_fft_ini
                        statdft=dfticreatedescriptor(plandft,dfti_double,dfti_complex,2,(/nx,ny/))
                        statdft=dftisetvalue(plandft,dfti_backward_scale,1.d0/dble(nxy))
                        statdft=dfticommitdescriptor(plandft)
                        return
                      end subroutine intel_fft_ini
                    !
                    !
                      subroutine forwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nxy)
                        statdft=dfticomputeforward(plandft,zfunc)
                      end subroutine forwardfft
                    !
                    !
                      subroutine backwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nxy)
                        statdft=dfticomputebackward(plandft,zfunc)
                      end subroutine backwardfft
                    end module intel_fft
    プログラムの最初に一度だけintel_fft_iniを実行する。特別の理由 (実サイン変換・コサイン変換を使う、4倍精度を使う) が無い限り、これ一択で問題ないと思う。OpenMP並列も自動でやってくれる。コンパイルはファイルのある場所にモジュールを格納するディレクトリ (ここではmod) を作っておいて
                    ifort /opt/intel/compilers_and_libraries/linux/mkl/include/mkl_dfti.f90 -c (追加オプション)  -module mod
                    ifort hoge.f90 -c -L /opt/intel/compilers_and_libraries/linux/mkl/lib/intel64/ -mkl -lpthread (追加オプション)  -module mod
                    ifort hoge.o -L /opt/intel/compilers_and_libraries/linux/mkl/lib/intel64/ -mkl -lpthread (追加オプション)  -module mod -o hoge.exe
    とする。
  • Intel MKL Real DFT CCE形式の扱いの注意点

    IntelのMKLを用いて実数の高次元フーリエ変換を行う場合、フーリエ成分の格納にはCCE, CCS, Pack, Permの4つの形式がある。PackとPermが実成分とフーリエ成分で同じ配列の成分数となるので、 (格納の仕方についてはマニュアルを見なければいけないが) 数値計算的には使いやすいと思う。個人的にはPermの方が好み。CCSは論外。ただしCCS, Pack, Permは2次元までしかサポートしておらず、3次元では使えない。CCEは複素フーリエ変換と同じ場所にフーリエ成分が格納され、その配列サイズは、実成分が\( n_x \times n_y\)ならば\((n_x/2+1) \times n_y\)となる。したがって余分な配列を持つことになる。具体的には2次元FFTの場合、フーリエ成分を\( B(i,j) \)としたときに
    \( B(0,0) \), \( B(n_x/2,0) \), \( B(0,n_y/2) \), \( B(n_x/2,n_y/2) \)の虚部が\( 0 \)
    \( B(0,n_y/2+1 \le j \le n_y-1) \)は\( B(0,n_y-j) \)の複素共役
    \( B(n_x/2,n_y/2+1 \le j \le n_y-1) \)は\( B(n_x/2,n_y-j) \)の複素共役
    となっている。3次元の場合は
    \( B(0,0,0) \), \( B(n_x/2,0,0) \), \( B(0,n_y/2,0) \), \( B(n_x/2,n_y/2,0) \), \( B(0,0,0) \), \( B(n_x/2,0,n_z/2) \), \( B(0,n_y/2,n_z/2) \), \( B(n_x/2,n_y/2,n_z/2) \)の虚部が\( 0 \)
    \( B(0,n_y/2+1 \le j \le n_y-1,0) \)は\( B(0,n_y-j,0) \)の複素共役
    \( B(0,n_y/2+1 \le j \le n_y-1,n_z/2) \)は\( B(0,n_y-j,n_z/2) \)の複素共役
    \( B(n_x/2,n_y/2+1 \le j \le n_y-1,0) \)は\( B(n_x/2,n_y-j,0) \)の複素共役
    \( B(n_x/2,n_y/2+1 \le j \le n_y-1,n_z/2) \)は\( B(n_x/2,n_y-j,n_z/2) \)の複素共役
    \( B(0,0 \le j \le n_y-1,n_z/2+1 \le k \le n_z-1) \)は\( B(0,n_y-j,n_z-k) \)の複素共役
    \( B(n_x/2,0 \le j \le n_y-1,n_z/2+1 \le k \le n_z-1) \)は\( B(n_x/2,n_y-j,n_z-k) \)の複素共役
    となっている。順方向フーリエ変換は問題ないが、逆方法フーリエ変換を行う際に、フーリエ成分が上記の性質を満たしていないと、変な結果となる。どうやら参照しないデータがあるというわけでもないみたい。フーリエ変換をフィルタリングやデータ圧縮、微分積分を求めるのに使うだけであれば、上記の性質が破れることはあまりないみたいだが、例えば、波数空間でランジュバン方程式を解くときなどは、上記の性質を満たすように解かないと大変なことになる。何が言いたいかというと、これぐらいのことマニュアルに書いておけこのバカ野郎ってこと。 また、CCE形式は実成分が実数、フーリエ成分が複素数になるので、結果の配置は素直に上書きしない (dfti_not_inplace) にしたほうが良い。注意しておきたいのは、実成分とフーリエ成分でストライドが通常異なることと、順方向フーリエ変換は実成分が入力、フーリエ成分が出力になるのに対して、逆方向フーリエ変換はその逆になるので、順方向と逆方向で異なるディスクリプターを生成しておかなければならない。3次元の場合の具体例としては
                    implicit none
                    type(dfti_descriptor),pointer,save,private :: plandftf,plandftb
                    integer,save,private :: statdftf,statdftb
                    !
                    statdftf=dfticreatedescriptor(plandftf,dfti_double,dfti_real,3,(/nx,ny,nz/))
                    statdftf=dftisetvalue(plandftf,dfti_conjugate_even_storage,dfti_complex_complex)
                    statdftf=dftisetvalue(plandftf,dfti_placement,dfti_not_inplace)
                    statdftf=dftisetvalue(plandftf,dfti_input_strides,(/0,1,nx,nx*ny/))
                    statdftf=dftisetvalue(plandftf,dfti_output_strides,(/0,1,nx/2+1,(nx/2+1)*ny/))
                    statdftf=dfticommitdescriptor(plandftf)
                    !
                    statdftb=dfticreatedescriptor(plandftb,dfti_double,dfti_real,3,(/nx,ny,nz/))
                    statdftb=dftisetvalue(plandftb,dfti_conjugate_even_storage,dfti_complex_complex)
                    statdftb=dftisetvalue(plandftb,dfti_placement,dfti_not_inplace)
                    statdftb=dftisetvalue(plandftb,dfti_backward_scale,1.d0/dble(nx*ny*nz))
                    statdftb=dftisetvalue(plandftb,dfti_input_strides,(/0,1,nx/2+1,(nx/2+1)*ny/))
                    statdftb=dftisetvalue(plandftb,dfti_output_strides,(/0,1,nx,nx*ny/))
                    statdftb=dfticommitdescriptor(plandftb)
    として、順方向フーリエ変換はplandftfを、逆方向フーリエ変換はplandftbを使うと良い。何が言いたいかというと、これぐらいのことマニュアルに書いておけこのバカ野郎ってこと。 (Packや) Perm形式ではこのような問題は当然起こらない。ちなみにPerm形式を使いたい場合は
                    type(dfti_descriptor),pointer,save,private :: plandft
                    integer,save,private :: statdft
                    statdft=dfticreatedescriptor(plandft,dfti_double,dfti_real,2,(/nx,ny/))
                    statdft=dftisetvalue(plandft,dfti_backward_scale,1.d0/dble(nx*ny))
                    statdft=dftisetvalue(plandft,dfti_real_storage,dfti_real_complex)
                    statdft=dftisetvalue(plandft,dfti_packed_format,dfti_pack_format)
                    statdft=dfticommitdescriptor(plandft)
    とする。3次元フーリエ変換でどうしてもPerm形式を使いたい場合は、京都大学の大浦氏のサイトで配布されているものを使うと良い。ただしサイズは2の冪に限定。fftwはそもそもCCE形式以外の形式の使用を推奨していないので却下 (DCTをサポートしているのでそれらを使いたい場合には使えるのだが。っていうかなんでIntelは馬鹿高い金をとっておきながらDCTをサポートしていないんだ?) 。
  • FFTW (バージョン3) を用いたFFT

    FFTW自体のインストールはファイル解凍後 (UbuntuならUbuntuソフトウェアセンターからインストール可能) 作成されたディレクトリに移動して
                    ./configure
                    make
                    make install
    でできる。OpenMPによる並列化やOpenMPIによる並列化をする予定なのであれば1行目は
                    ./configure --enable-threads --enable-mpi
    としておく。モジュールは多少面倒で、フーリエ変換する配列をあらかじめ指定しておかなければならない。
                    module fftw_fft
                      use,intrinsic :: iso_c_binding
                    !
                      implicit none
                      include '/usr/local/include/fftw3.f03'
                      integer,parameter :: nx=512,ny=512
                      integer,parameter :: nxy=nx*ny
                      type(c_ptr),save,private :: plandftf,plandftb
                      real(8),private :: xlxy
                      complex(8),private :: zf(nx,ny)
                    !
                    !
                      contains
                    !
                    !
                      subroutine fftw_fft_ini
                        plandftf=fftw_plan_dft_2d(nx,ny,zf,zf,fftw_forward,fftw_measure)
                        plandftb=fftw_plan_dft_2d(nx,ny,zf,zf,fftw_backward,fftw_measure)
                        xlxy=dble(nxy)
                        return
                      end subroutine fftw_fft_ini
                    !
                    !
                      subroutine forwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        zf(1:nx,1:ny)=zfunc(1:nx,1:ny)
                        call fftw_execute_dft(plandftf,zf,zf)
                        zfunc(1:nx,1:ny)=zf(1:nx,1:nx)
                      end subroutine forwardfft
                    !
                    !
                      subroutine backwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        zf(1:nx,1:ny)=zfunc(1:nx,1:ny)
                        call fftw_execute_dft(plandftb,zf,zf)
                        zfunc(1:nx,1:ny)=zf(1:nx,1:nx)/xlxy
                      end subroutine backwardfft
                    end module fftw_fft
    OpenMPによる並列化を行う場合、プログラム中で並列数を指定しておく必要がある。環境変数OMP_NUM_THREADSの値をそのまま並列数にする場合には
                    module fftw_fft
                      use,intrinsic :: iso_c_binding
                      use omp_lib
                    !
                      implicit none
                      include '/usr/local/include/fftw3.f03'
                      integer,parameter :: nx=512,ny=512
                      type(c_ptr),save,private :: plandftf,plandftb
                      integer,private :: iret,nprocs
                      real(8),private :: xlxy
                      complex(8),private :: zf(nx,ny)
                    !
                    !
                      contains
                    !
                    !
                      subroutine fftw_fft_ini
                        !$OMP PARALLEL
                        nprocs=omp_get_max_threads()
                        !$OMP END PARALLEL
                    !
                        call dfftw_init_threads(iret)
                        call dfftw_plan_with_nthreads(nprocs)
                    !
                        plandftf=fftw_plan_dft_2d(nx,ny,zf,zf,fftw_forward,fftw_measure)
                        plandftb=fftw_plan_dft_2d(nx,ny,zf,zf,fftw_backward_fftw_measure)
                        xlxy=dble(nxy)
                        return
                      end subroutine fftw_fft_ini
                    !
                    !
                      subroutine forwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        !$OMP WORKSHARE
                        zf(1:nx,1:ny)=zfunc(1:nx,1:ny)
                        !$OMP END WORKSHARE
                        call fftw_execute_dft(plandftf,zf,zf)
                        !$OMP WORKSHARE
                        zfunc(1:nx,1:ny)=zf(1:nx,1:nx)
                        !$OMP END WORKSHARE
                      end subroutine forwardfft
                    !
                    !
                      subroutine backwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        !$OMP WORKSHARE
                        zf(1:nx,1:ny)=zfunc(1:nx,1:ny)
                        !$OMP END WORKSHARE
                        call fftw_execute_dft(plandftb,zf,zf)
                        !$OMP WORKSHARE
                        zfunc(1:nx,1:ny)=zf(1:nx,1:nx)/xlxy
                        !$OMP END WORKSHARE
                      end subroutine backwardfft
                    end module fftw_fft
    としておけばよい。またx方向だけフーリエ変換する場合で並列化する場合にはfftwに対して並列数を宣言する必要はなく
                    module fftw_fft
                      use,intrinsic :: iso_c_binding
                      use omp_lib
                    !
                      implicit none
                      include '/usr/local/include/fftw3.f03'
                      integer,parameter :: nx=512,ny=512
                      type(c_ptr),save,private :: plandftf,plandftb
                      real(8),private :: xlx
                      complex(8),private :: zf(nx)
                    !
                    !
                      contains
                    !
                    !
                      subroutine fftw_fft_ini
                        plandftf=fftw_plan_dft_1d(nx,zf,zf,fftw_forward_fftw_measure)
                        plandftb=fftw_plan_dft_1d(nx,zf,zf,fftw_backward,fftw_measure)
                        xlx=dble(nx)
                        return
                      end subroutine fftw_fft_ini
                    !
                    !
                      subroutine forwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        integer :: iy
                        !$OMP PARALLEL private(zf)
                        !$OMP DO
                        do iy=1,ny
                          zf(1:nx)=zfunc(1:nx,iy)
                          call fftw_execute_dft(plandftf,zf,zf)
                          zfunc(1:nx,iy)=zf(1:nx)
                        enddo
                        !$OMP END DO
                        !$OMP END PARALLEL
                      end subroutine forwardfft
                    !
                    !
                      subroutine backwardfft(zfunc)
                        complex(8),intent(inout) :: zfunc(nx,ny)
                        integer :: iy
                        !$OMP PARALLEL private(zf)
                        !$OMP DO
                        do iy=1,ny
                          zf(1:nx)=zfunc(1:nx,iy)
                          call fftw_execute_dft(plandftb,zf,zf)
                          zfunc(1:nx,iy)=zf(1:nx)/xlx
                        enddo
                        !$OMP END DO
                        !$OMP END PARALLEL
                      end subroutine backwardfft
                    end fftw_fft
    コンパイルは、OpenMP並列をしない最初の例であれば
                    ifort hoge.f90 -L /opt/intel/compilers_and_libraries/linux/mkl/lib/intel64/ -L /usr/local/lib -lfftw3 -lm -mkl -lpthread (追加オプション)  -module mod -o hoge.exe
    並列をするのであれば
                    ifort hoge.f90 -L /opt/intel/compilers_and_libraries/linux/mkl/lib/intel64/ -L /usr/local/lib -lfftw3_threads -lfftw3 -lm -mkl -lpthread -qopenmp (追加オプション)  -module mod -o hoge.exe

速度比較

幾つかの単純計算の速度を比較してみる (全部-O0オプションで比較したので-O3オプションでは速度差は縮まる可能性あり) 。

  • 複素数の絶対値自乗 (シングルコアで10000000000回くらい計算)

                    cdabs(f)**2
    平均27.5秒
                    dreal(dconjg(f)*f)
    平均9.8秒
                    dreal(f)**2+dimag(f)**2
    平均10.5秒 演算回数は3番目の方が少ないと思っていたが意外にも2番目の方が速い。ただしその差は小さい。
  • 複素数の偏角

                    datan2(dimag(f),dreal(f))
    平均37.8秒
                    dimag(cdlog(f))
    平均1分45.7秒
    後者の方が遅いとは思っていたが、ここまで差があるとは意外。
  • 虚数乗

                    dcmplx(dcos(f),dsin(f))
    平均1分57.3秒
                    cdexp(dcmplx(0.d0,f))
    平均1分58.2秒
    偏角の方と違ってこちらは差が小さい。
  • ガウス乱数作成 (Intel MKL使用 100000乱数生成を100000回計算)

                    vdrnggaussian(vsl_rng_method_gaussian_icdf,stream,100000,f,0.d0,1.d0)
    平均1分3.3秒
                    vdrnggaussian(vsl_rng_method_gaussian_boxmuller,stream,100000,f,0.d0,1.d0)
    平均2分44.7秒
                    vdrnggaussian(vsl_rng_method_gaussian_boxmuller2,stream,100000,f,0.d0,1.d0)
    平均1分47.1秒

Fortranに関して一言

Fortranはfor取らん