Coder Social home page Coder Social logo

julialang / distributed.jl Goto Github PK

View Code? Open in Web Editor NEW
18.0 17.0 8.0 556 KB

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.

Home Page: https://docs.julialang.org/en/v1/stdlib/Distributed/

License: MIT License

Julia 100.00%
distributed-computing julia parallel-computing

distributed.jl's Introduction

Distributed

The Distributed package provides functionality for creating and controlling multiple Julia processes remotely, and for performing distributed and parallel computing. It uses network sockets or other supported interfaces to communicate between Julia processes, and relies on Julia's Serialization stdlib package to transform Julia objects into a format that can be transferred between processes efficiently. It provides a full set of utilities to create and destroy new Julia processes and add them to a "cluster" (a collection of Julia processes connected together), as well as functions to perform Remote Procedure Calls (RPC) between the processes within a cluster. See API for details.

This package ships as part of the Julia stdlib.

Using development versions of this package

To use a newer version of this package, you need to build Julia from scratch. The build process is the same as any other build except that you need to change the commit used in stdlib/Distributed.version.

It's also possible to load a development version of the package using the trick used in the Section named "Using the development version of Pkg.jl" in the Pkg.jl repo, but the capabilities are limited as all other packages will depend on the stdlib version of the package and will not work with the modified package.

API

The public API of Distributed consists of a variety of functions for various tasks; for creating and destroying processes within a cluster:

  • addprocs - create one or more Julia processes and connect them to the cluster
  • rmprocs - shutdown and remove one or more Julia processes from the cluster

For controlling other processes via RPC:

  • remotecall - call a function on another process and return a Future referencing the result of that call
  • Future - an object that references the result of a remotecall that hasn't yet completed - use fetch to return the call's result, or wait to just wait for the remote call to finish
  • remotecall_fetch - the same as fetch(remotecall(...))
  • remotecall_wait - the same as wait(remotecall(...))
  • remote_do - like remotecall, but does not provide a way to access the result of the call
  • @spawnat - like remotecall, but in macro form
  • @spawn - like @spawn, but the target process is picked automatically
  • @fetch - macro equivalent of fetch(@spawn expr)
  • @fetchfrom - macro equivalent of fetch(@spawnat p expr)
  • myid - returns the Int identifier of the process calling it
  • nprocs - returns the number of processes in the cluster
  • nworkers - returns the number of worker processes in the cluster
  • procs - returns the set of IDs for processes in the cluster
  • workers - returns the set of IDs for worker processes in the cluster
  • interrupt - interrupts the specified process

For communicating between processes in the style of a channel or stream:

  • RemoteChannel - a Channel-like object that can be put! to or take! from any process

For controlling multiple processes at once:

  • WorkerPool - a collection of processes than can be passed instead a process ID to certain APIs
  • CachingPool - like WorkerPool, but caches functions (including closures which capture large data) on each process
  • @everywhere - runs a block of code on all (or a subset of all) processes and waits for them all to complete
  • pmap - performs a map operation where each element may be computed on another process
  • @distributed - implements a for-loop where each iteration may be computed on another process

Process Identifiers

Julia processes connected with Distributed are all assigned a cluster-unique Int identifier, starting from 1. The first Julia process within a cluster is given ID 1, while other processes added via addprocs get incrementing IDs (2, 3, etc.). Functions and macros which communicate from one process to another usually take one or more identifiers to determine which process they target - for example, remotecall_fetch(myid, 2) calls myid() on process 2.

!!! note Only process 1 (often called the "head", "primary", or "master") may add or remove processes, and manages the rest of the cluster. Other processes (called "workers" or "worker processes") may still call functions on each other and send and receive data, but addprocs/rmprocs on worker processes will fail with an error.

distributed.jl's People

Contributors

amitmurthy avatar andrioni avatar ararslan avatar aviatesk avatar c42f avatar dilumaluthge avatar fredrikekre avatar jeffbezanson avatar jishnub avatar jpsamaroo avatar keno avatar kristofferc avatar krynju avatar kshyatt avatar mgkuhn avatar musm avatar nalimilan avatar quinnj avatar rfourquet avatar sacha0 avatar ssikdar1 avatar staticfloat avatar stefankarpinski avatar stevengj avatar tanmaykm avatar timholy avatar tkelman avatar vchuravy avatar viralbshah avatar vtjnash avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

distributed.jl's Issues

need periodic distributed GC for quicker release of remote objects

Overwriting RemoteRef variables does not release memory.

Start Julia from bash: julia -p 2
Check memory: free -m (used = 7398)
In Julia: r = remotecall(2, rand, 17000, 17000);
Check memory: free -m (used = 9617)

Now overwrite r:

In Julia: r = remotecall(2, rand, 17000, 17000);
Check memory: free -m (used = 11834)
In Julia: gc()
Check memory: free -m (used = 11814)

I would expect the overwritten r to have its memory released by the garbage collector.

Closures are availble automatically on parrellel workers, but normal functions are not

On Julia 0.5, I encountered the following strange behavior.

addprocs(4)
function adder_gen(n)
    function add(x)
        x+n
    end
end
f_closure=adder_gen(5)
fetch(@spawn f_closure(5))

Normally, I would expect the last line not to work. There was no @everywhere preceding the closure generation. If the this was done with a normal function, an error would be generated.
I believe an explanation is in order. There is nothing in the manual that would suggest this behavior.

Need better control over worker logging

Currently, even if you define your own AbstractLogger and set Base.global_logger(mylogger), worker loggers still prepend " From worker X: to all logs.

My current work-around is doing

using Distributed
function Distributed.redirect_worker_output(ident, stream)
           @schedule while !eof(stream)
               println(readline(stream))
           end
       end
using MyPkg
MyPkg.run()

So hurray for JuliaLang/julia#265 and all, but we really need better controls here.

Distributed remote references + precompilation

Currently remote refs (RemoteChannel, Future) are a bit of a silent killer if used in precompiled module. Though our precompilation docs spell out in great detail the various constructors/patterns that should be avoided, nothing is mentioned of remote refs.

Digging into the implementation, however, reveals that remote refs use a global counter for self-identifying. It seems there's also likely to be issues with the worker id of the remote ref, but I didn't personally run into that (in my use-case, I was always creating RemoteChannels on pid 1 as "config variables" that all other worker processes could reference).

The problem identifies itself by producing invalid results: e.g. the first Future on pid 1 created at runtime will probably match the exact RRID of a precompiled RemoteChannel and hence isready, wait, fetch return results of the precompiled RemoteChannel instead of the Future. Quite a lovely surprise!

Obviously we want to avoid this situation of things seeming completely broken, so if that involves throwing explicit errors when precompiling a module w/ global RemoteChannel or Future variables, that seems safest to me. I'm not aware of all the precompilation magic that is available though in the case that we could actually make this work.

Ultimately, I think something more than just docs here.

Error with remotecall_fetch called on locally defined closures

Ref: discourse link

In cases when parallel data movement is a bottle neck, it would useful to be able to use closures with pre-computed internal state parameters that are computed on each worker only once and not serialized from the master node.

Here are two examples. The first one is a simple case where gen_foo returns a closure generated on all workers. However remotecall_fetch gives an error. @amitmurthy gave a workaround in the link above but it would be nice to have a more direct approach.

The second example tries to use planned FFT's as internal states in the closures.

First example

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere function gen_foo(local_state)
           foo(x) = x * sum(local_state)
           return foo::Function
       end

julia> @everywhere foo = gen_foo(rand(100,100))

julia> remotecall_fetch(foo, 1, 10) #<-- works
50008.9259374688

julia> remotecall_fetch(foo, 2, 10) #<-- UndefVarError: #foo#9 not defined
ERROR: On worker 2:
UndefVarError: #foo#9 not defined
deserialize_datatype at ./serialize.jl:968
handle_deserialize at ./serialize.jl:674
deserialize at ./serialize.jl:634
handle_deserialize at ./serialize.jl:681
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
JuliaLang/julia#99 at ./event.jl:73
Stacktrace:
 [1] #remotecall_fetch#141(::Array{Any,1}, ::Function, ::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:354
 [2] remotecall_fetch(::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:346
 [3] #remotecall_fetch#144(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367
 [4] remotecall_fetch(::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367

Second example

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere function gen_bar(local_state, Δx, n)
           FFT = Δx / (2π) * plan_rfft(rand(n); flags=FFTW.PATIENT, timelimit=4)
           function bar(x)
               y = FFT * (x .* local_state)
               return y[1]
           end
           return bar::Function
       end

julia> @everywhere Δx, n = 0.1, 1000

julia> @everywhere bar = gen_bar(rand(n), Δx, n)

julia> x = rand(n);

julia> @everywhere x=$x

julia> @everywhere println(bar(x))    # <-- works
3.9819631247716307 + 0.0im
	From worker 2:	3.8079619702395155 + 0.0im
       From worker 3:	4.008786805675184 + 0.0im

julia> remotecall_fetch(bar, 1, x)   # <-- works
3.9819631247716307 + 0.0im

However I get UndefVarError: #bar#9 not defined for these

remotecall_fetch(bar, 2, x) # <-- UndefVarError: #bar#9 not defined

out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
    bar(x)
end

out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
        remotecall_fetch(bar, i, x)
end

remotecall_fetch vulnerable to hash collisions?

I've bumped into this behavior by accident after defining hash incorrectly for arrays. I'm absolutely not sure it indicates a bug, but I figured I'd better report it since this kind of exceptional situation is rarely tested.

It appears that when hash collisions happen, remotecall_fetch is not able to detect that the contents of an array have been updated. This can easily be reproduced by always returning the same hash for all arrays. This is surprising to me, as I would have expected that differing hashes are sufficient, but not necessary, to consider that two arrays are different, i.e. that isequal would always be called to check for hash collisions.

julia> using Distributed

julia> Base.hash(::AbstractArray, ::UInt) = zero(UInt)

julia> x = [1]
1-element Array{Int64,1}:
 1

julia> remotecall_fetch(()->x, 2)
1-element Array{Int64,1}:
 1

julia> x[1] = 2
2

julia> remotecall_fetch(()->x, 2) # Woops, not updated
1-element Array{Int64,1}:
 1

RemoteRef memory leak when serialized to a different worker

Scenario:

  • Master creates a RemoteRef on worker 2
  • RemoteRef is sent to Worker 3 as part of message, but not used/assigned/stored on 3
  • Reference to RemoteRef is removed from 1 and gc() called everywhere
  • Reference continues to exist on 2 since it has not received a "del msg" from 3.
julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> # create a reference on pid 2
       rr = RemoteRef(2)
RemoteRef(2,1,4)

julia> # See if anything has actually been created on worker 2
       Base.remote_do(2, ()->println(keys(Base.PGRP.refs)))

        From worker 2:  Any[]

julia> # Nope, nothing
       put!(rr, :OK)
RemoteRef(2,1,4)

julia> # Now, see again
       Base.remote_do(2, ()->println(keys(Base.PGRP.refs)))

        From worker 2:  Any[(1,4)]

julia> # It exists.

       # Let us send this reference to a 3rd worker.
       Base.remote_do(3, x->nothing, rr)

julia> # Check which workers that supposed to hold references to this RemoteRef
       Base.remote_do(2, ()->println(Base.PGRP.refs[(1,4)].clientset))

        From worker 2:  IntSet([1, 3])

julia> # 2 believes that 1 and 3 hold a reference

julia> # Clear locally and run gc()
       rr=nothing

julia> @everywhere gc()
julia> @everywhere gc()
julia> @everywhere gc()

julia> # 1 is cleared, but worker 2 believes that 3 continues to hold a reference
       Base.remote_do(2, ()->println(Base.PGRP.refs[(1,4)].clientset))

      From worker 2:  IntSet([3])
julia>  

I have tracked it down to finalizers not being called on the RemoteRef. The finalizer sends a del_msg to the processes actually holding the value.

Finalizers are not being called for regular objects too, when they are serialized to a remote worker.

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> # creates workers with pids 2 and 3

       @everywhere begin

       function finalize_foo(f)
           v = f.foo
           @schedule println("FOO finalized $v")
       end

       type Foo
           foo
           Foo(x) = (f=new(x); finalizer(f, finalize_foo); f)
       end

       function Base.serialize(s::SerializationState, f::Foo)
           invoke(serialize, Tuple{SerializationState, Any}, s, f)
       end

       function Base.deserialize(s::SerializationState, t::Type{Foo})
           f = invoke(deserialize, Tuple{SerializationState, DataType}, s, t)
           Foo(myid())
       end

       end

julia> Base.remote_do(3, x->nothing, Foo(0))
RemoteRef(3,1,6)

julia> @everywhere gc()
FOO finalized 0

julia> @everywhere gc()
julia> @everywhere gc()

As can be seen, Foo was not finalized on worker 3.

cc: @carnaval , @JeffBezanson

Distributed docs are a bit contradictory as to if it loads packages you are `using` on the manager process

The answer is:
Yes it loads them (requires them, i.e. don't bring into scope), on all workers, when you using them on the manager,
but when you start a new worker, it doesn't start with the ones that are currently loaded on the manager process.

https://docs.julialang.org/en/v1/manual/parallel-computing/#code-availability-1
first says:

Finally, if DummyModule.jl is not a standalone file but a package, then using DummyModule will load DummyModule.jl on all processes, but only bring it into scope on the process where using was called.

Then later kind of contradicts that:

Note that workers do not run a ~/.julia/config/startup.jl startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules) with any of the other running processes.

The second bit is kind wrong.
It does synconize loaded modules.

pmap performance regression: pmap(x->f(x,y), X) creates copies of y

This is a copy-paste of a discussion topic I created yesterday https://discourse.julialang.org/t/pmap-performance-regression-pmap-x-f-x-y-x-creates-copies-of-y/14221

Updating some code from 0.5 to 1.0 massively slowed pmap calls for our use case.

Briefly, distributing the computation of f(x,arg) over the set X seems to copy and send arg during each iteration. This becomes a problem when the parameters in arg include large objects.

This can be reproduced in 0.6+ (tested 0.6.4 and 1.0.0). Benchmarks below are for a fresh 1.0 install on a windows machine (also reproduced on a linux HPC)

using BenchmarkTools
VERSION.major < 1 || using Distributed
addprocs() ##4
@everywhere begin
  bigarr = ones(10^8)
  f_passall(a,x) = length(x) + a
end
its = 1:20
julia> @btime map(x->f_passall(x,bigarr), its);
  940.280 ns (27 allocations: 736 bytes)
julia> @btime pmap(x->f_passall(x,bigarr), its);
  2.283 s (1560 allocations: 97.86 KiB)

Redefining f to use bigarr as a global variable seems to fix the issue, at a cost

 @everywhere f_globals(a) = length(bigarr) + a
  julia> @btime map(x->f_globals(x), its);
    1.391 μs (47 allocations: 1.03 KiB)
  julia> @btime pmap(x->f_globals(x), its);
    881.018 μs (1493 allocations: 96.64 KiB)

Increasing the number of iterations further slows down the pmap call, proportionally

its = 1:50;
  julia> @btime pmap(x->f_passall(x,bigarr), its);
    5.676 s (3834 allocations: 185.53 KiB)
  julia> @btime pmap(x->f_globals(x), its);
    2.169 ms (3658 allocations: 182.25 KiB)

The issue did not seem to occur as of 0.5.0: f_passall and f_globals have comparable performance, and most of the time is spent on overhead (remaining about constant with greater its).

  julia> @time pmap(x->f_passall(x,bigarr), 1:20);
    0.290894 seconds (422.72 k allocations: 17.810 MB, 2.42% gc time)

  julia> @time pmap(x->f_passall(x,bigarr), 1:50);
    0.290469 seconds (427.01 k allocations: 17.937 MB, 2.49% gc time)

  julia> @time pmap(x->f_globals(x), 1:20);
    0.276240 seconds (422.46 k allocations: 17.765 MB)

  julia> @time pmap(x->f_globals(x), 1:50);
    0.288293 seconds (426.70 k allocations: 17.921 MB, 2.39% gc time)

interrupt() on a freshly opened julia crashes workers

when I issue interrupt() directly after opening Julia with
~/julia/julia -p2

I always get an error and some of the workers get terminated:

julia> interrupt()
fatal: error thrown and no exception handler available.
InterruptException()
rec_backtrace at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
jl_throw at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
unknown function (ip: 0x7f168ca7b30f)
unknown function (ip: 0x7f168ca7b379)
unknown function (ip: 0x7f168c313690)
syscall at /lib64/libc.so.6 (unknown line)
uv__epoll_wait at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
uv__io_poll at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
uv_run at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
process_events at ./stream.jl:713
wait at ./task.jl:360
task_done_hook at task.jl:174
jl_apply_generic at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
unknown function (ip: 0x7f168ca68eca)
unknown function (ip: (nil))
fatal: error thrown and no exception handler available.
InterruptException()
rec_backtrace at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
jl_throw at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
unknown function (ip: 0x7f8ae739b30f)
unknown function (ip: 0x7f8ae733f3f9)
jl_trampoline at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
jl_apply_generic at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
jl_uv_closeHandle at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
uv_run at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
process_events at ./stream.jl:713
wait at ./task.jl:360
task_done_hook at task.jl:174
jl_apply_generic at /root/Julia/usr/bin/../lib/libjulia.so (unknown line)
unknown function (ip: 0x7f8ae7388eca)
unknown function (ip: (nil))
Worker 2 terminated.
julia> Worker 3 terminated.ERROR (unhandled task failure): EOFError: read end of file
julia> 
ERROR (unhandled task failure): EOFError: read end of file

versioninfo:

Julia Version 0.4.4-pre+35
Commit 1a9429e (2016-02-12 13:21 UTC)
Platform Info:
System: Linux (x86_64-pc-linux-gnu)
CPU: AMD Phenom(tm) II X4 955 Processor
WORD_SIZE: 64
BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Barcelona)
LAPACK: libopenblas64_
LIBM: libopenlibm
LLVM: libLLVM-3.3

`@distributed` fails on loops over iterators

Distributed for loops over iterators fail, seemingly because @distributed expects the iterator to implement getindex. It makes sense that @distribute needs getindex, but I don't see that in the docs. The possible solutions seem to be:

  1. Update the docs for @distributed to mention this requirement.
  2. Update the implementation of @distributed to work for iterators.

We could do JuliaLang/julia#1 until JuliaLang/julia#2 is finished.

Here's an example:

@distributed (*) for (i, j) in Base.Iterators.product([1, 2], [3, 4])
    i + j
end
ERROR: LoadError: MethodError: no method matching getindex(::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64)
(::getfield(Main, Symbol("##7#8")))(::typeof(+), ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64, ::Int64) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:275
(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:339
run_work_thunk(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch(::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
(::getfield(Distributed, Symbol("##167#168")){typeof(+),getfield(Main, Symbol("##7#8")),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Array{UnitRange{Int64},1},Int64,Int64})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:259
Stacktrace:
 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] wait(::Condition) at ./event.jl:46
 [4] wait(::Task) at ./task.jl:188
 [5] fetch at ./task.jl:202 [inlined]
 [6] iterate at ./generator.jl:47 [inlined]
 [7] collect(::Base.Generator{Array{Task,1},typeof(fetch)}) at ./array.jl:619
 [8] preduce(::Function, ::Function, ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:263
 [9] top-level scope at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:274
 [10] include at ./boot.jl:317 [inlined]
 [11] include_relative(::Module, ::String) at ./loading.jl:1044
 [12] include(::Module, ::String) at ./sysimg.jl:29
 [13] include(::String) at ./client.jl:392
 [14] top-level scope at none:0
in expression starting at /home/dan/julia/test.jl:3

This is surprising, since the following non-parallel reduction works:

julia> reduce((*), [i + j for (i, j) = Base.Iterators.product([1, 2], [3, 4])])  
600

Adding a collect to the distributed version fixes it:

@distributed (*) for (i, j) in collect(Base.Iterators.product([1, 2], [3, 4]))
    i + j
end

Output: 600

Julia Version 1.0.2
Commit d789231e99* (2018-11-08 20:11 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.0 (ORCJIT, skylake)
Environment:
  JULIA_REVISE_INCLUDE = 1
  JULIA_DEBUG = all

Large number of temporary shared arrays crashes Julia. Need a more aggressive gc?

On my computer julia always crashes on this code:

@everywhere const m     = 100
@everywhere const n     = 100
@everywhere const T     = 100

@everywhere @inline function slice!(x, t, p)
    #   do something
end

function shared_test(p)
    x   = SharedArray(Float64, (m, n, T), init= S -> S[localindexes(S)] = 0.0)

    @sync @parallel for t= 1:T
        slice!(x, t, p)
    end
    return x
end 

function doit()
    p   = rand(m,n,T)

    for i=1: 10000000000
        x = shared_test(p)
        println(i)
    end
end

doit()

This triggers a signal (7): Bus error which results in the workers beeing killed.

I tested this, because I wanted to isolate a memory leak that I suspect. I think it also occurs here, but Julia crashes before I can tell.

I think, if I leave out the Initialization:

x   = SharedArray(Float64, (m, n, T), init= S -> S[localindexes(S)] = 0.0)

by changing the line to

x   = SharedArray(Float64, (m, n, T))

there is no memory leak, but then there are other errors. I am also not sure if the parallel loop is relevant for this.

versioninfo:

Julia Version 0.4.4-pre+35
Commit 1a9429e (2016-02-12 13:21 UTC)
Platform Info:
  System: Linux (x86_64-pc-linux-gnu)
  CPU: AMD Phenom(tm) II X4 955 Processor
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Barcelona)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.3

Setup worker-worker connections lazily

The default all_to_all topology connects all processes to each other. While this is fine for small clusters, the total number of TCP connections increases rapidly as (N^2)/2.

Considering that a large class of parallel problems only need master-worker connections we should change the default topology to all_to_all_lazy where worker-worker connections are setup only on the first request from a worker to another worker. And also introduce another topology master_routed which only connects master to workers, and in case of a worker-worker call, routes the request through the master.

To summarize, implement 2 new topologies:

  1. all_to_all_lazy where worker-worker connections are setup lazily, and is the default for addprocs and

  2. master_routed in which only the master connects to workers and worker-worker messages are routed via the master.

pmap feature request to facilitate dynamic scheduling

code that is called by pmap() should have the ability to signal to the scheduler a (dynamic) determination not to schedule any more new tasks. existing tasks should either be finished or aborted, depending on signal.

For example, after any one task function has signaled “TASKSNONEW”, the scheduler should stop scheduling further processes, but let’s the remaining tasks finish (e.g., we already found the alien, when the search parties come back, don’t send them out again); when any one task function signals “TASKSALLABORT”, the scheduler should further immediately terminates all other tasks ( (e.g., the alien has found us, is hostile, and we will need all search parties to come back immediately to help defend the base).

I think this is in the spirit of pmap, but greatly improves its previously static functionality to become dynamic. I also hope that it can be implemented with relatively little pain to the developers (catch interrupt?) and to the user. it becomes much easier for the user to signal “done” than having to write their own schedulers---a more obscure task.

In addition, I think the pmap() call should have arguments to set timeouts, both for itself and for individual tasks.

Machinefile nonuniform install locations

There is little documentation for the format of machinefiles (not that it is very complex).

Currently machinefiles take number of procs, host, and ssh flags only, and assumes that the install location is conserved across systems (not the case for some of my work). The only real way to solve the problem is to not use a host file. It doesn't seem unreasonable to add some sort of option to the machinefile (I could do it pretty easily after trying to track this thing down).

It's just a little more string parsing, just a "dir=" and "exename=" option.

Cross platform issues with Remote Workers / SSH Cluster Manager / Native Dependecies

Hi -- Using a head node (i.e. procid == 1) that is a mac on v0.3.6, I am trying to use linux based workers using SSHClusterManager.

I experience problems with e.g. using HDF5 --- the basic cause seems to be that

  • workers delegate include to node 1 (include == include_from_node1)
  • HDF5 (and many others) use BinDeps
  • BinDeps creates a custom deps.jl which is platform (and presumably even box) specific

so my linux boxes complain when the cannot locate the mac dylib

I've thought a bit about how to resolve this, but nothing obvious and elegant pops to mind. (For now I've just hacked my deps.jl on my mac to support both OS X and linux)

Have others seen this kind of issue? Is there some simple way to have the workers not pull code from node1 but simply rely on the locally installed packages?

I was thinking of hacking include_from_node1 in the .juliarc.jl on the linux boxes to simply not pull code from node1, but that seems a bit drastic -- any thoughts about whether this would work?

As an aside, while I can understand the motivation for include, using etc to work by delegating to node1 (e.g, simplify the need for code distribution), it does seems a bit difficult to do robustly or in a way that will scale nicely to dozens or hundreds of workers....

thanks

Order dependent module loading with `Distributed`

Discovered by @ararslan in his quest to get Nanosoldier back online.

In distributed mode using X should make X available on the worker nodes as a root module so that remotecall(X.f) works without a @everywhere X.

This correctly works:

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere begin
       function log_require(mod::Base.PkgId)
         @info "Loading pkg $mod $(Base.root_module_exists(mod)) on proc $(myid())"
       end
       push!(Base.package_callbacks, log_require)
       end

julia> using BenchmarkTools
[ Info: Loading pkg JSON [682c06a0-de6a-54ab-a142-c8b1cf79cde6] true on proc 1
[ Info: Loading pkg JSON [682c06a0-de6a-54ab-a142-c8b1cf79cde6] true on proc 2
[ Info: Loading pkg JSON [682c06a0-de6a-54ab-a142-c8b1cf79cde6] true on proc 3
[ Info: Loading pkg BenchmarkTools [6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf] true on proc 2
[ Info: Loading pkg BenchmarkTools [6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf] true on proc 3
[ Info: Loading pkg BenchmarkTools [6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf] true on proc 1

If the user says using X before addprocs we no longer trigger the callback on a subsequent using X,
thereby triggering JuliaLang/julia#28857 (comment)

julia> using Distributed

julia> using BenchmarkTools

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere begin
       function log_require(mod::Base.PkgId)
          @info "Loading pkg $mod $(Base.root_module_exists(mod)) on proc $(myid())"
      end
      push!(Base.package_callbacks, log_require)
      end

julia> using BenchmarkTools

julia> @everywhere using BenchmarkTools
[ Info: Loading pkg JSON [682c06a0-de6a-54ab-a142-c8b1cf79cde6] true on proc 2
[ Info: Loading pkg JSON [682c06a0-de6a-54ab-a142-c8b1cf79cde6] true on proc 3
[ Info: Loading pkg BenchmarkTools [6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf] true on proc 2
[ Info: Loading pkg BenchmarkTools [6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf] true on proc 3

Failed `addprocs` attempt hangs instead of timing out when attempting to connect via SSH

The following appears to hang without ever timing out on my local machine (I let it run for a few minutes before interrupting it):

julia> addprocs(["user@host"], tunnel=true)

Traceback from the interrupt:

ERROR: InterruptException:
 in parse_connection_info at multi.jl:1048
 in read_worker_host_port at multi.jl:1040
 in connect at managers.jl:224
 in create_worker at multi.jl:1201
 in setup_launched_worker at multi.jl:1148
 in anonymous at task.jl:447
 in sync_end at ./task.jl:413
 [inlined code] from task.jl:422
 in addprocs at multi.jl:1114
 in addprocs at managers.jl:52

I believe the actual "hanging" part of this is probably firewall-related on my end, but I posted this issue because the timeout I expected from reading the addprocs doc never occurs (neither the default 60 second timeout or JULIA_WORKER_TIMEOUT seem to matter).

For clarity's sake, running ssh user@host in the shell works fine.

Memory leak using shared arrays on cluster?

Hello everyone,

I see the following strange behavior starting multiple processes on a KNL node with Julia 1.0.1. I add some processes using addprocs(SYS.CPU_THREADS, topology=:master_worker). Already that consumes roughly a fourth of the available memory (25 out of 96GB). In my code I now allocate a large shared array (~10^6 elements) and compute its entries (just multiplications and sums, there should not be any further allocations). My job now hangs when sharing the array or when trying to iterate over it via @sync @distributed. During that period memory consumption grows until a bus error occurs and the job cancels.

The same code runs fine on my local machine with 4 cores, with memory consumption stable.

Any ideas where that may come from? Anyone can reproduce something similar with a shared array of similar size and many processes?

memory is not being released aggressively enough during parallel processing

I have a large codebase that runs without any problems in 0.4.0, but shows symptoms of memory leak when running in 0.4.3.

Basically, the memory increases after a call to @sync @parallel for. It seems that some of the memory allocated for the parallel processing is not freed. A call to gc() in the end of the loop does not fixes the problem.

The figure below shows the memory allocation for the same script using 0.4.0 (first half of the graphic) and 0.4.3, where it can be seen thtat the memory increases after each call to @sync @parallel for.

processos

It is the first time I fill an issue, so if I am not using the proper words/methodology, I apologize.

Calling pmap within threads hangs

Hi, for some reason the code snippet below hangs. Does any one have an explanation as to why this hangs

using distributed
addprocs(2)
tasks = Vector{Task}(undef,4)
@sync for i in 1:4
       tasks[i] = Threads.@spawn begin
           pmap(x->x, WorkerPool(workers()), 1:10)
       end
end
fetch.(tasks)

Surprisingly if i first run pmap(x->x, WorkerPool(workers()), 1:10) before running the code above everything works as expected.

julia> versioninfo()
Julia Version 1.4.1
Commit 381693d3df* (2020-04-14 17:20 UTC)
Platform Info:
OS: Linux (x86_64-pc-linux-gnu)
CPU: Intel(R) Core(TM) i3-5005U CPU @ 2.00GHz
WORD_SIZE: 64
LIBM: libopenlibm
LLVM: libLLVM-8.0.1 (ORCJIT, broadwell)
Environment:
JULIA_NUM_THREADS = 4

intermittent warnings of forcibly interrupting busy workers

My code that adds processes using addprocs and subsequently performs parallelization using pmap sometimes terminates with the following warning. This doesn't affect my output of the code in any way, but this warning shows up in the end, esp. with scripts that run for significant amount of time (over an hour at least).

┌ Warning: Forcibly interrupting busy workers
│   exception = rmprocs: pids [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24] not terminated after 5.0 seconds.
â”” @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1234
┌ Warning: rmprocs: process 1 not removed
â”” @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1030

I am not sure if this is a machine related issue or it has something to do with the Distributed package.

`using` loads modules on workers but does not put exported bindings in Main

This may be intended, but it seems a bit awkward to me:

$ julia -p 1
[...]
  | | |_| | | | (_| |  |  Version 0.4.0-dev+1922 (2014-12-02 23:10 UTC)
 _/ |\__'_|_|_|\__'_|  |  Commit e4e1688* (0 days old master)
|__/                   |  x86_64-apple-darwin14.0.0

julia> using StatsBase

julia> @everywhere assert(isa(StatsBase.sample, Function))

julia> @everywhere assert(isa(sample, Function))
exception on 2: ERROR: sample not defined
 in eval at /usr/local/julia/base/sysimg.jl:7
 in anonymous at multi.jl:1395
 in anonymous at multi.jl:820
 in run_work_thunk at multi.jl:593
 in run_work_thunk at multi.jl:602
 in anonymous at task.jl:6

Obviously the workaround is @everywhere using StatsBase, but I think that using X should probably be equivalent to @everywhere using X, or if not, it should act exclusively on the main process. Instead it seems we get using X on the main process and require("X") on the workers.

`pmap_reduce` function

I have need for a pmap_reduce function.

In Julia v0.5 and earlier, I was able to customize the base pmap to provide what I needed.
However since v0.6, parallel processing including pmap has been overhauled and largely rewritten. To a non-expert like me, it is much more complex and impenetrable, compared to the simpler implementation of earlier versions, which is still given as an example in the Parallel Computing section of the manual. https://docs.julialang.org/en/stable/manual/parallel-computing/#Scheduling-1

Are there any issues with continuing in v0.6 and later to model my own pmap_reduce on the v0.5 pmap? What are the benefits of the new implementation of pmap?

Alternatively, would you consider a feature request for an official pmap_reduce?

addprocs: resolve home directory on workers instead of master

Right now the default choice will fail if you connect to a linux system from Mac or if the username on the remote system is different from the username on the master process. E.g.

sh: 1: cd: can't cd to /Users/andreasnoack/julia-dev
ERROR: Unable to read host:port string from worker. Launch command exited with error?

Ctrl-C when master process is waiting for crashed workers

When master Julia process is waiting in take!() on its own RemoteRef and the worker processes have all thrown exceptions, pressing Ctrl-C in REPL results in

ERROR (unhandled task failure): InterruptException:
 in task_done_hook at ./task.jl:175

but doesn't bring the master from the waiting state.

Anonymous functions with keyword arguments not automatically shipped to remote workers

Here's the code to reproduce (Julia 1.1.0):

using Distributed
addprocs(2)
f = (;x)->2x
pmap(1:2) do i
    f(x=i)
end

This will give an error like:

ERROR: On worker 2:
UndefVarError: ##7#8 not defined

As far as I can tell reading the docs, this should work. Also note that it does work if the function has no keyword arguments.

f = x->2x
pmap(1:2) do i
    f(i)
end

2-element Array{Int64,1}:
 2
 4

Surprising and undocumented `import` handling in `Distributed.@everywhere`

Consider the following code:

    using Distributed
    addprocs(1)
    @everywhere begin
          # assume MyModule is defined in the path given
          push!(LOAD_PATH, "/path/to/my/module")
          import MyModule
    end

This fails with a LoadError, because @everywhere cherry-picks the imports for each everywhere block and runs them first regardless of the order of statements inside the block. Instead, one has to do this:

    # (...)
    @everywhere push!(LOAD_PATH, "/path/to/my/module")
    @everywhere import MyModule

This behaviour thoroughly breaks my internal parser, because these two blocks look equivalent (modulo an unnecessary synchronization point in the middle). Worse, the behaviour is not documented, and in fact contradicts the documentation.

Unfortunately, no rationale is given in the code as to why this was chosen. However, I think at the very least, one should amend the documentation with a corresponding note and add a rationale for this in the code.

One could maybe also issue a warning for @everywhere blocks which do not have import statements as their first statements. This would help poor newbies like me :)

Julia version: 1.2.0 (also present in master)

global constant not automatically transferred along with function call

After creating a shared array available to all processees, the array is unavailable unless a fetchfrom is called first. Here is a simple example demonstrating it. (Note, I run it with julia -p 4 test.jl which is why Distributed isn't loaded)

@everywhere using SharedArrays

const arr = SharedArray{Int64, 2}((5,5))

for i in 1:5
    arr[i, :] = fill(i, 5)
end

@everywhere function test()
    arr[myid(), myid()] = 10
    return arr[myid(), :]
end 

println(remotecall_fetch(test, 2)) # doesn't work, throws err arr is undefined
@fetchfrom 2 arr
println(remotecall_fetch(test, 2)) # works now
println(remotecall_fetch(test, 3)) # still doesn't work on this worker unless a fetchfrom is called for this processor too
println(arr)

The arrays I'm using are quite big and so running a fetchfrom for each running worker is not very efficient

feature request: @distributed_threads and pmap_threads

In a setup where:

  • There are multiple worker processes (possibly on different servers in a cluster).
  • There are multiple threads in each process (possibly different amounts in different servers).

It isn't trivial to create such a setup - one needs to tweak launching worker processes to be multi-threaded. It would be easy if there was a command-line flag for julia that specified the number of threads, requested in JuliaLang/julia#34309. But it is still possible to create such a setup today with a bit of effort, and it is useful as all the threads in each worker process benefit from automatic shared memory "everything", rather than being restricted to constructs such as SharedArray. Of course this means one needs to be careful.

In such a scenario, the current behavior is very clear:

  • A @threads loop uses the threads of the current (main or worker) process.
  • A @distributed loop and pmap use a single thread in each worker process.

This has the advantage of simplicity and clarity. It also allows using a nested @threads in each iteration of @distributed or pmap to utilize all the threads in all the machines.

However, it would also be useful to have @distributed_threads and pmap_threads.

A @distributed_threads would statically allocate the same number of iterations for each thread across all the machines - that is, will allocate more iterations to worker processes with more threads, and then internally use @threads to execute these on each of the worker process threads. This would be the natural extension of @distributed, which uses static allocation of iterations to processes.

A pmap_threads would dynamically allocate tasks to each thread across all machines. The batch size, if specified, will individually apply to each thread. It might be useful to add a second batch group size (a positive number of batches) such that each worker process would get a whole group of batches at once, and use the threads to execute the smaller batches, to reduce the amount of cross-process coordination required. This would be the natural extension of pmap which uses dynamic allocation of iterations to processes.

include on workers has somewhat strange behavior

If I'm reading include_from_node1 correctly, we convert path passed to include to an absolute path on the current worker, and then fetch the source from node 1. This makes include inside packages work, since the absolute path is relative to the path to the required file on node 1. However, this behavior seems more questionable for a bare include not inside a file that was reloaded or required (e.g. @spawnat 2 include("mycode.jl")), since in this case the absolute path is relative to the worker's current working directory, which might not exist on node 1. In that case, I think we should either:

  • include relative to the current working directory on node 1. (This is not so weird, since require and reload search the current working directory on node 1.)
  • include the file from the local file system. But this may be ill-advised. If you include a local file and it calls include, that should naturally refer to a local file. But if you include a local file and it calls require or using, do you load local code or code from node 1?
  • Throw some kind of error.

fetch(remotecall(...)) does not throw when nprocs() == 1

julia> using Distributed

julia> fetch(remotecall(error, default_worker_pool(), "hello"))
RemoteException(1, CapturedException(ErrorException("hello"), Any[(error(::String) at error.jl:33, 1), ((::Distributed.var"#137#138"{typeof(error),Tuple{String},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}})() at remotecall.jl:350, 1), (run_work_thunk(::Distributed.var"#137#138"{typeof(error),Tuple{String},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at process_messages.jl:79, 1), (run_work_thunk at process_messages.jl:88 [inlined], 1), ((::Distributed.var"#96#98"{Distributed.RemoteValue,Distributed.var"#137#138"{typeof(error),Tuple{String},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}})() at task.jl:333, 1)]))

julia> addprocs(1)
1-element Array{Int64,1}:
 2

julia> fetch(remotecall(error, default_worker_pool(), "hello"))
ERROR: On worker 2:
hello
error at ./error.jl:33
JuliaLang/julia#103 at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:290
run_work_thunk at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:79
run_work_thunk at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:88
JuliaLang/julia#96 at ./task.jl:333
Stacktrace:
 [1] #remotecall_fetch#143 at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:390 [inlined]
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID) at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:382
 [3] #remotecall_fetch#146 at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:417 [inlined]
 [4] remotecall_fetch at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:417 [inlined]
 [5] call_on_owner at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:490 [inlined]
 [6] fetch(::Future) at /home/takafumi/repos/watch/julia/usr/share/julia/stdlib/v1.4/Distributed/src/remotecall.jl:529
 [7] top-level scope at REPL[4]:1

julia> VERSION
v"1.4.0-DEV.297"

I expect fetch(remotecall(error, default_worker_pool(), "hello")) before and after addprocs(1) behaves similarly.

`@distributed` fails silently when lacking `@sync` or reduction

For a simple example:

@distributed for (i, j) in Base.Iterators.product([1, 2], [3, 4])
    @show i + j; i + j
end

This should error, as in #57, but no error is reported. The code silently never runs the contents of the distributed loop. If you tack on a @sync or even put in a reduction, i.e. @distributed (*) then the error is reported properly.

This is relevant to my use-case where I set up a RemoteChannel and listen to it after running the @distributed loop asynchronously.

julia> versioninfo()
Julia Version 1.3.0
Commit 46ce4d7933 (2019-11-26 06:09 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.1 (ORCJIT, skylake)

Nested PMAP calls do not work

Hello,
the following code with has nested pmap functions, it hangs even if I execute it with 1 single process addprocs(1).

using Distributed
@everywhere function inner_process(task_id)
    task_id^2
end

@everywhere function outer_process(job_id)
    inner_task  = collect(1:2)
    pmap(inner_process, inner_task)
end

function do_job(jobs_list) 
    pmap(outer_process, jobs_list)
end

jobs_list = collect(1:10)
do_job(jobs_list)

This is the version I am using

julia> versioninfo()
Julia Version 1.1.0
Commit 80516ca202 (2019-01-21 21:24 UTC)
Platform Info:
OS: Windows (x86_64-w64-mingw32)
CPU: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
WORD_SIZE: 64
LIBM: libopenlibm
LLVM: libLLVM-6.0.1 (ORCJIT, skylake)
Environment:
export JULIA_NUM_THREADS = 4
JULIA_DEPOT_PATH = C:\Program Files\ReSolver.DistributedJulia\depot
JULIA_EDITOR = C:\Users\AppData\Local\atom\atom.exe
JULIA_NUM_THREADS = 4
JULIA_PKGDIR = C:\Program Files\ReSolver.DistributedJulia\packages

Am I maybe misunderstanding the feature or is this an expected behavior?

Concurrency violation on interplay between Distributed and Base.Threads

I’m working on a distributed pipeline algorithm that uses several stages per worker process. IIRC tasks cannot hop between threads once they’ve been scheduled. Since I want my stages to potentially run in parallel, I tried to create non-sticky tasks by chaining each D.@spawnat with a T.@spawn. However, this setup keeps failing/crashing and I don’t understand why.

I boiled it down to a minimal example:

using Distributed, Base.Threads

const D = Distributed
const T = Threads

pids = addprocs(10)
wids = repeat(pids, inner=2)

conns = map(RemoteChannel, wids)
fst = first(conns)
lst = RemoteChannel()
push!(conns, lst)

@everywhere begin
    function stillepost(i, prev, next)
        message = take!(prev)
        put!(next, message)
        @info "Player $i done"
    end
end

players = []
for i in 1:length(wids)
    w = wids[i]
    c1 = conns[i]
    c2 = conns[i+1]
    p = D.@spawnat w fetch(T.@spawn stillepost(i, c1, c2))
    push!(players, p)
end

game = @async begin
    m1 = "gibberish"
    put!(fst, m1)
    m2 = take!(lst)
    @info "'$m1' turned into '$m2'; well done!"
end

wait.(players)
wait(game)

Player 2 fails with a concurrency violation:

julia> include("stillepost.jl")
[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException:
concurrency violation detected
error at ./error.jl:33
concurrency_violation at ./condition.jl:8
assert_havelock at ./condition.jl:25 [inlined]
assert_havelock at ./condition.jl:48 [inlined]
assert_havelock at ./condition.jl:72 [inlined]
wait at ./condition.jl:102
wait_for_conn at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:193
check_worker_state at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:168
send_msg_ at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:176
send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:134 [inlined]
#remotecall_fetch#143 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:389
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
#remotecall_fetch#146 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:494
put! at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:595 [inlined]
stillepost at /Users/jonas/.../stillepost.jl:18
JuliaLang/julia#3 at ./threadingconstructs.jl:169
wait at ./task.jl:267 [inlined]

Am I holding it wrong?

Julia Version 1.5.1
Commit 697e782ab8 (2020-08-25 20:08 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin19.5.0)
  CPU: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 4
  JULIA_PROJECT = @.

See also this post on discourse. foobar_lv2 suggested this might be a bug in Distributed.

Different remote copying behaviour with one process vs multiple

using Base.Test

function main()
    @testset begin
        rng = MersenneTwister(3742)
        numbers = pmap(1:1000) do _
            rand(rng)
        end

        @test all(numbers .== numbers[1])

        spawn_numbers = [(@fetch rand(rng)) for i = 1:1000]

        @test all(spawn_numbers .== spawn_numbers[1])

        pfor_numbers = @parallel vcat for i = 1:1000
            [rand(rng)]
        end

        @test all(pfor_numbers .== pfor_numbers[1])
    end
end

main()

There are three cases here to compare different behaviours with the different parallel methods but I really just care about the @fetch/@spawn case.

ericdavies@whitacre> julia -p 1 ~/rng_parallel_test.jl
test set: Test Failed
  Expression: all(pfor_numbers .== pfor_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:20 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
Test Summary: | Pass  Fail  Total
  test set    |    2     1      3
ERROR: LoadError: Some tests did not pass: 2 passed, 1 failed, 0 errored, 0 broken.
 in finish(::Base.Test.DefaultTestSet) at test.jl:498
 in macro expansion at test.jl:681 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24
ericdavies@whitacre> julia ~/rng_parallel_test.jl
test set: Test Failed
  Expression: all(numbers .== numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:10 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
test set: Test Failed
  Expression: all(spawn_numbers .== spawn_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:14 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
test set: Test Failed
  Expression: all(pfor_numbers .== pfor_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:20 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
Test Summary: | Fail  Total
  test set    |    3      3
ERROR: LoadError: Some tests did not pass: 0 passed, 3 failed, 0 errored, 0 broken.
 in finish(::Base.Test.DefaultTestSet) at test.jl:498
 in macro expansion at test.jl:681 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24

When there are one or more additional processes, rng is copied. When there is only one process, it is not.

The reason the @parallel for case fails both times is due to the use of CachingPool, which may also be used for pmap in the future via JuliaLang/julia#21946.

Ideally I would like the behaviours to be consistent (though I don't need them to be deterministic).

propagate Ctrl-C on a `put!`, `take!`, etc to the remote process

julia> rr=RemoteRef(2)
RemoteRef(2,1,1)

julia> take!(rr)
^CERROR: InterruptException:
 in process_events at ./stream.jl:642
 in wait at ./task.jl:302
 in wait at ./task.jl:228
 in wait_full at ./multi.jl:631
 in remotecall_fetch at multi.jl:731
 in call_on_owner at ./multi.jl:778
 in take! at ./multi.jl:811

julia> put!(rr, 1)
RemoteRef(2,1,1)

julia> take!(rr)

The second take! does not get the data from the put! since Ctrl-C does not actually kill the spawned first take! task on the remote worker.

Interrupting the parallel API calls should interrupt the waiting tasks on the remote process too.

Parallel multiple for loop

I wanted to run a parallel nested for loop. In the docs it shows how you can do a nested for loop into a single outer loop:

for i = 1:2, j = 3:4
  println((i, j))
end

and an example for parallel for loop:

nheads = @parallel (+) for i=1:200000000
  int(rand(Bool))
end

How can I combine these concepts? I tried something along the lines of:

result = @parallel (hcat) for i=1:10,j=1:10
    [i^2,j^2]
end

but I get:

ERROR: syntax: invalid assignment location

Also tried with Iterators package:

julia> result = @parallel (hcat) for p in product(0:0.1:1,0:0.1:1)
         [p[1]^2, p[2]^2]
       end
exception on 1: ERROR: MethodError: `getindex` has no method matching getindex(::Iterators.Product, ::Int64)
Closest candidates are:
  getindex(::(Any...,), ::Int64)
  getindex(::(Any...,), ::Real)
  getindex(::FloatRange{T}, ::Integer)
  ...

 in anonymous at no file:1679
 in anonymous at multi.jl:1528
 in run_work_thunk at multi.jl:603
 in run_work_thunk at multi.jl:612
 in anonymous at task.jl:6
MethodError(getindex,(Iterators.Product(Any[0.0:0.1:1.0,0.0:0.1:1.0]),1))

I am on:

julia> versioninfo()
Julia Version 0.4.0-dev+2684
Commit 8938e3a (2015-01-13 22:01 UTC)
Platform Info:
  System: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i7-4700MQ CPU @ 2.40GHz
  WORD_SIZE: 64
  BLAS: libopenblas (NO_LAPACK NO_LAPACKE DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: liblapack.so.3
  LIBM: libopenlibm
  LLVM: libLLVM-3.3

rmprocs / addprocs racy

node termination during node provisioning is not well handled, resulting in `connect: connection refused (ECONNREFUSED) in connect_to_worker from the new worker to the terminating worker.

for an example, see: https://travis-ci.org/JuliaLang/julia/jobs/186141590

julia> p = addprocs(2)

julia> begin # try this a couple times
         @spawnat p[1] sleep(5)
         @show rmprocs(p[1]; waitfor=0)
         @show workers()
         @show p = addprocs(1)
       end
rmprocs(p[1]; waitfor=0) = :ok
workers() = [3,4,5]
ERROR: connect: connection refused (ECONNREFUSED)
 in yieldto(::Task, ::ANY) at ./event.jl:153
 in wait() at ./event.jl:186
 in wait(::Condition) at ./event.jl:27
 in stream_wait(::TCPSocket, ::Condition, ::Vararg{Condition,N}) at ./stream.jl:42
 in wait_connected(::TCPSocket) at ./stream.jl:258
 in connect at ./stream.jl:957 [inlined]
 in connect_to_worker(::String, ::Int16) at ./managers.jl:490
 in connect_w2w(::Int64, ::WorkerConfig) at ./managers.jl:453
 in connect(::Base.DefaultClusterManager, ::Int64, ::WorkerConfig) at ./managers.jl:387
 in connect_to_peer(::Base.DefaultClusterManager, ::Int64, ::WorkerConfig) at ./multi.jl:1516
 in (::Base.##598#600{WorkerConfig,Int64})() at ./task.jl:404
Error [connect: connection refused (ECONNREFUSED)] on 6 while connecting to peer 4. Exiting.
Worker 6 terminated.
ERROR (unhandled task failure): Version read failed. Connection closed by peer.

need precompile statements re-enabled for `addprocs` (with PR)

As discovered in https://discourse.julialang.org/t/help-with-binary-trees-benchmark-games-example/37307/13

❯ hyperfine -w1 "julia -p4 -E 'using Distributed; nprocs()'" "julia -E 'using Distributed; addprocs(); nprocs()'"
Benchmark JuliaLang/julia#1: julia -p4 -E 'using Distributed; nprocs()'
  Time (mean ± σ):      2.040 s ±  0.010 s    [User: 5.563 s, System: 0.773 s]
  Range (min … max):    2.024 s …  2.054 s    10 runs
 
Benchmark JuliaLang/julia#2: julia -E 'using Distributed; addprocs(); nprocs()'
  Time (mean ± σ):      1.785 s ±  0.014 s    [User: 5.337 s, System: 0.756 s]
  Range (min … max):    1.765 s …  1.816 s    10 runs
 
Summary
  'julia -E 'using Distributed; addprocs(); nprocs()'' ran
    1.14 ± 0.01 times faster than 'julia -p4 -E 'using Distributed; nprocs()''

Is there a reason spawning the extra processes with addprocs() is necessarily faster than spawning them with -p command-line argument?

`pmap` still parallelizing after `rmprocs`?

The first call to pmap after using rmprocs to remove all the workers still behaves as if it is running in parallel. The second call behaves normally:

julia> using Distributed

julia> addprocs(5);

julia> @time @sync pmap(i -> sleep(1), 1:10);
  2.711441 seconds (251.94 k allocations: 13.460 MiB)

julia> wait(rmprocs(workers()...))

julia> @time @sync pmap(i -> sleep(1), 1:10);
  2.111553 seconds (192.76 k allocations: 9.978 MiB)

julia> @time @sync pmap(i -> sleep(1), 1:10);
 10.135405 seconds (126.43 k allocations: 6.459 MiB, 0.12% gc time)

Version info:

Julia Version 1.5.0
Commit 96786e22cc (2020-08-01 23:44 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin18.7.0)
  CPU: Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, haswell)

Heartbeats between master and worker processes.

Heartbeats between master and workers have been mentioned before. Seeing JuliaLang/IJulia.jl#8 triggered it again.

Suggesting a design :

  • The requirement is to provide an alternate communication channel to a Julia worker process that can be used for 2 things presently - a) heartbeats and b) querying the state of the process (including long running computation progress)
  • This has to be in a different thread since a long running compute bound task will not yield frequently enough
  • The handler also needs to be in C since core Julia is not yet thread-safe
  • Currently the accept_handler in multi.jl waits for connection requests from peer processes.
  • Upon a new connection it will now read a a short 4-byte header field that will specify what type of connection it is.
  • If the client has sent "WRKR", it will call create_message_handler_loop as it currently does
  • If the client has sent "ADMN", it will disengage the socket from libuv, start a new OS thread and execute a C "admin" function in the new thread with this socket. Only one admin thread can be running at any time.
  • The "admin" function will currently just respond to "ping" messages with some state information about the worker process. Maybe also provide a means for https://github.com/timholy/ProgressMeter.jl to publish compute progress information which is returned with every ping. Will use a pipe to push information between the Julia thread and the admin thread.
  • If a ping message has not been received for a specified timeout (say 2 minutes) or this socket connection breaks, the worker exits if configured to do so.
  • Julia master process too starts a new thread (C code) that continually sends these heartbeat messages to all connected workers and exposes the returned state information to the Julia thread. A show_workers() can pretty print this information.

The whole heartbeat mechanism can be switched on via a command line argument to the julia executable (default is off)

It seems workable enough and we don't need the admin thread to listen on a different port. Do let me know what you guys think.

SharedArray not working on remote machines

I am trying to set up SharedArrays on remote machines. I.e. shared among processes on the same machine. Unfortunately this doesn't seem to work. I am using version 0.5.0-dev+749 of julia.

julia> addprocs(1) #add one process on the master node
1-element Array{Int64,1}:
 2

julia> println(procs())
[1,2]

julia> S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid(), pids=Int[1,2])
3x4 SharedArray{Int64,2}:
 1  1  2  2
 1  1  2  2
 1  1  2  2

julia> using ClusterManagers

julia> remotes = addprocs(SlurmManager(2), nodes=1)
srun: job 1828134 queued and waiting for resources
srun: job 1828134 has been allocated resources
2-element Array{Int64,1}:t of 2
 3
 4

julia> for w in remotes #both remote processes are on the same machine
           println(remotecall_fetch(readall, w, `hostname`)) 
       end
mrc-bsu-tesla1

mrc-bsu-tesla1


julia> r = @spawnat remotes[1] S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid(), pids=remotes)
RemoteRef{Channel{Any}}(3,1,14)

julia> fetch(r)
3x4 SharedArray{Int64,2}:
 #undef  #undef  #undef  #undef
 #undef  #undef  #undef  #undef
 #undef  #undef  #undef  #undef

julia> r = @spawnat remotes[1] S*eye(4) #convert to regular array
RemoteRef{Channel{Any}}(3,1,16)

julia> fetch(r)
ERROR: On worker 3:
UndefRefError: access to undefined reference
 [inlined code] from sharedarray.jl:294
 in copy_transpose! at abstractarray.jl:513
 in copy_transpose! at linalg/matmul.jl:349
 in generic_matmatmul! at linalg/matmul.jl:459
 in * at linalg/matmul.jl:144
 in anonymous at multi.jl:1330
 in anonymous at multi.jl:889
 in run_work_thunk at multi.jl:645
 in run_work_thunk at multi.jl:654
 in anonymous at task.jl:54
 in remotecall_fetch at multi.jl:731
 [inlined code] from multi.jl:368
 in call_on_owner at multi.jl:776
 in fetch at multi.jl:784

remove order dependency between "using" and "addprocs"

The following sequence resulted in the worker segfaulting and terminating.

  • using PTools
  • addprocs(1)
  • call a method in PTools. The method creates an anonymous function (in PTools code) and does a remotecall_fetch on worker 2 for executing the said anonymous function.
  • worker 2 terminates at a deserialize call.

Flipping the order of addprocs and using results in proper execution.

Suggest that we keep track of modules loaded in process 1, and load the same modules on all workers whenever we do an addprocs

module globals and parallel methods

Consider the following:

julia> addprocs(1)
1-element Array{Int64,1}:
 2

julia> @everywhere module Foo

       foo() = remotecall(2, ()->(global X=[1]))
       bar() = @everywhere X[1]=2

       end

julia> Foo.foo()
RemoteRef(2,1,6)

julia> Foo.bar()
exception on 1: exception on 2: ERROR: X not defined
 in eval at /home/amitm/Work/julia/julia/base/sysimg.jl:7
 in anonymous at multi.jl:1439
 in run_work_thunk at multi.jl:598
 in run_work_thunk at multi.jl:607
 in anonymous at task.jl:6

This is because foo() creates X as module global, while the @everywhere call refers to Main

Would it be appropriate to have the remote part of all parallel methods execute under the same module as the calling module? Is information about the calling module available to a function in Base?

Think about simplified scoping rules for parallel macros

There seems to be some general confusion amongst the user base for exactly how variable references in an expression passed to one of the parallel macros are resolved and when the value of variables is transferred vs treated as a reference to a global variable on some module defined on the worker.

Part of the problem could be resolved with better documentation, but I also wonder if we can find a set of rules that is more intuitive and robust in the Julia 0.5 timeframe.

Here are some specific issues concerning symbol resolution in parallel macros issues already brought up that we could maybe make some headway on:

Define and implement a routable, extensible Julia "message"

Starting a discussion to cleanly specify a Julia message. This will help in

  • swappable transports at a "message" level. For example, 0MQ or MPI. See https://github.com/JuliaParallel/MPI.jl/issues/60
  • messages that can be routed via intermediate nodes - important when we support other topologies.
  • implementation of reliable/guaranteed message delivery system
  • user defined messages - the subsystem should be able to transport and hand-off user defined message for processing by user code.
  • recovery from errors while deserializing messages. Well defined message boundaries should help.

A Julia message header could include

  • version
  • from_pid
  • to_pid
  • length in bytes
  • message_name - symbol used to identify the handler

Decision: Use of `asyncmap` in pmap batch mode

pmap in batch mode uses a local asyncmap to process a batch - https://github.com/JuliaLang/julia/blob/9e3318c9840e7a9e387582ba861408cefe5a4f75/base/distributed/pmap.jl#L198

Considering that each computation in pmap is fairly large, and batch sizes small, an asyncmap would not have a major overhead and if the computation involves IO, quite beneficial.

For example, if the input is a list of file names to be processed, it is efficient to interleave I/O and computation and hence a local asyncmap is a better fit.

This issue is to take a decision whether to

  1. Keep it as it is, i.e., no change - the batch is processed using asyncmap

  2. Change it to a local map. If the computation involves I/O the caller would have to explicitly partition the input and the mapping function in turn would need to perform an asyncmap and a flatten on the final output.

  3. Add another keyword arg to pmap, batch_function=map. To use asyncmap, the caller would need to explicitly specify batch_function=asyncmap. A user defined function (for example one that uses @threads ) can also be specified.

`Distributed.send_msg()` silently drops messages containing methods not defined on workers

Example:

using Distributed
import Distributed: worker_from_id, MsgHeader, RemoteDoMsg, send_msg
w = worker_from_id(addprocs(1)[1])

# Put `@everwhere` before this to get it to work
print_message(args...) = println(args...)

# This works
send_msg(w, MsgHeader(), RemoteDoMsg(println, ("hello!",), ()))

# This doesn't
send_msg(w, MsgHeader(), RemoteDoMsg(print_message, ("hello!",), ()))

If you put @everywhere before the definition of print_message then everything works; but if you don't, there is no error; the worker just silently ignores it. I would expect an error to tell me what went wrong. I'm not sure at what point things are going wrong; it does not appear to be a runtime error, as this occurs even when I am sending a message to call a function that is defined, but it is supposed to invoke a callback that may not be defined. Example:

using Distributed
import Distributed: worker_from_id, MsgHeader, RemoteDoMsg, send_msg
w = worker_from_id(addprocs(1)[1])

@everywhere function do_work(callback, args...)
    @info("Within do_work!")
    callback(args...)
end

# Put `@everwhere` before this to get it to work
print_message(args...) = println(args...)

# This works
send_msg(w, MsgHeader(), RemoteDoMsg(do_work, (println, "hello!",), ()))

# This doesn't
send_msg(w, MsgHeader(), RemoteDoMsg(do_work, (print_message, "hello!",), ()))

Notice how the "Within do_work" message is not emitted in the second case. My best guess is that something in the serialization code is throwing an error and that error is getting swallowed, but I haven't had time to debug this fully.

Showing SharedArray objects in a different host causes confusion.

First, see the code below.

julia> pid = addprocs(["different_hostaddress_from_current"])[1]
julia> sa1 = @fetchfrom pid SharedArray( rand(4, 4) )
4×4 SharedArray{Float64,2}:
sa[ 0.588949  0.0481939  0.266658  0.542195
 0.162783  0.771512   0.812162  0.981772
 0.704765  0.325396   0.656134  0.469057
 0.326197  0.727144   0.696665  0.653014
julia> show(sa1)
[0.588949 0.0481939 0.266658 0.542195; 0.162783 0.771512 0.812162 0.981772; 0.704765 0.325396 0.656134 0.469057; 0.326197 0.727144 0.696665 0.653014]
julia> sa1[1,1]
0.0  # this is a trash value.
julia> sdata(sa)
0×0 Array{Float64,2}
# in reverse
julia> sa2 = SharedArray( (rand(4, 4) )
4×4 SharedArray{Float64,2}:
 0.913975  0.65263   0.388948  0.695337
 0.942663  0.251198  0.233023  0.376158
 0.539025  0.743089  0.612239  0.329394
 0.783334  0.425041  0.67182   0.381703
julia> @fetchfrom pid show(sa2)
julia> @fetchfrom pid println()
From worker 2:  [0.913975 0.65263 0.388948 0.695337; 0.942663 0.251198 0.233023 0.376158; 0.539025 0.743089 0.612239 0.329394; 0.783334 0.425041 0.67182 0.381703]
julia> @fetchfrom pid show(sa2[1,1])
julia> @fetchfrom pid println()
From worker 2:  0.0

I understood the intention.
show function is implemented as below
show(io, remotecall_fetch(sharr->sharr.s, S.pids[1], S))
This function actually shows the values in the different computer.
But, users may make a mistake that the variable sa in the current node has the values.

I think this should be documented or modified in a different way to avoid mistakes.
Because I suffered from this problem.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.