Coder Social home page Coder Social logo

goflow's Introduction

GoFlow - Dataflow and Flow-based programming library for Go (golang)

Build Status codecov

Status of this branch (WIP)

Warning: you are currently on v1 branch of GoFlow. v1 is a revisit and refactoring of the original GoFlow code which remained almost unchanged for 7 years. This branch is deep in progress, no stability guaranteed. API also may change.

If your code depends on the old implementation, you can build it using release 0.1.

--

GoFlow is a lean and opinionated implementation of Flow-based programming in Go that aims at designing applications as graphs of components which react to data that flows through the graph.

The main properties of the proposed model are:

  • Concurrent - graph nodes run in parallel.
  • Structural - applications are described as components, their ports and connections between them.
  • Reactive/active - system's behavior is how components react to events or how they handle their lifecycle.
  • Asynchronous/synchronous - there is no determined order in which events happen, unless you demand for such order.
  • Isolated - sharing is done by communication, state is not shared.

Getting started

If you don't have the Go compiler installed, read the official Go install guide.

Use go tool to install the package in your packages tree:

go get github.com/trustmaster/goflow

Then you can use it in import section of your Go programs:

import "github.com/trustmaster/goflow"

Basic Example

Below there is a listing of a simple program running a network of two processes.

Greeter example diagram

This first one generates greetings for given names, the second one prints them on screen. It demonstrates how components and graphs are defined and how they are embedded into the main program.

package main

import (
	"fmt"
	"github.com/trustmaster/goflow"
)

// Greeter sends greetings
type Greeter struct {
	Name           <-chan string // input port
	Res            chan<- string // output port
}

// Process incoming data
func (c *Greeter) Process() {
	// Keep reading incoming packets
	for name := range c.Name {
		greeting := fmt.Sprintf("Hello, %s!", name)
		// Send the greeting to the output port
		c.Res <- greeting
	}
}

// Printer prints its input on screen
type Printer struct {
	Line <-chan string // inport
}

// Process prints a line when it gets it
func (c *Printer) Process() {
	for line := range c.Line {
		fmt.Println(line)
	}
}

// NewGreetingApp defines the app graph
func NewGreetingApp() *goflow.Graph {
	n := goflow.NewGraph()
	// Add processes to the network
	n.Add("greeter", new(Greeter))
	n.Add("printer", new(Printer))
	// Connect them with a channel
	n.Connect("greeter", "Res", "printer", "Line")
	// Our net has 1 inport mapped to greeter.Name
	n.MapInPort("In", "greeter", "Name")
	return n
}

func main() {
	// Create the network
	net := NewGreetingApp()
	// We need a channel to talk to it
	in := make(chan string)
	net.SetInPort("In", in)
	// Run the net
	wait := goflow.Run(net)
	// Now we can send some names and see what happens
	in <- "John"
	in <- "Boris"
	in <- "Hanna"
	// Send end of input
	close(in)
	// Wait until the net has completed its job
	<-wait
}

Looks a bit heavy for such a simple task but FBP is aimed at a bit more complex things than just printing on screen. So in more complex an realistic examples the infractructure pays the price.

You probably have one question left even after reading the comments in code: why do we need to wait for the finish signal? This is because flow-based world is asynchronous and while you expect things to happen in the same sequence as they are in main(), during runtime they don't necessarily follow the same order and the application might terminate before the network has done its job. To avoid this confusion we listen for a signal on network's wait channel which is sent when the network finishes its job.

Terminology

Here are some Flow-based programming terms used in GoFlow:

  • Component - the basic element that processes data. Its structure consists of input and output ports and state fields. Its behavior is the set of event handlers. In OOP terms Component is a Class.
  • Connection - a link between 2 ports in the graph. In Go it is a channel of specific type.
  • Graph - components and connections between them, forming a higher level entity. Graphs may represent composite components or entire applications. In OOP terms Graph is a Class.
  • Network - is a Graph instance running in memory. In OOP terms a Network is an object of Graph class.
  • Port - is a property of a Component or Graph through which it communicates with the outer world. There are input ports (Inports) and output ports (Outports). For GoFlow components it is a channel field.
  • Process - is a Component instance running in memory. In OOP terms a Process is an object of Component class.

More terms can be found in Flow-based Wiki Terms and FBP wiki.

Documentation

Contents

  1. Components
    1. Ports and Events
    2. Process
    3. State
  2. Graphs
    1. Structure definition
    2. Behavior

Package docs

Documentation for the flow package can be accessed using standard godoc tool, e.g.

godoc github.com/trustmaster/goflow

Links

Here are related projects and resources:

TODO

  • Integration with NoFlo-UI/Flowhub (in progress)
  • Distributed networks via TCP/IP and UDP
  • Reflection and monitoring of networks

goflow's People

Contributors

abferm avatar akokhanovskyi avatar davidkbainbridge avatar kortschak avatar lrgar avatar mtojek avatar sascha-andres avatar trustmaster avatar v1xingyue avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goflow's Issues

Design guidelines for information packets

HI @trustmaster ,

I wanted to ask for your input regarding design of GoFlow components, and thought the discussion might be useful to others, so I added it here (or is there some better place?).

I was wondering what is the recommended strategy for designing information packets?

I know that FBP suggests to use "structured data packets", which I guess would correspond to some kind of structs with predefined fields etc.

At the same time, it seems good to support a bit of flexibility here. For example, for the blow-"library", it seems there is a lot of components that would suitably just use simple lines of text (or byte-slices/arrays), to maximize the performance, because of the prevalence of text-based formats in bioinformatics.

I suppose that I will need to use more structured IP-formats for other use cases though. That is where I was thinking whether it might be worthwhile to have some recommended general format of the IP:s as to increase interoperability between different component libraries.

As somebody pointed out though, the FBP pattern in itself allows to very easily create mappers between different IP formats, so this might be less of an issue in practice, but I wanted to at least raise it to discussion!

v1 Design proposal

7 years after starting this project, after writing and maintaining some FBP apps in production, after collecting feedback on this project (#48), it's time to propose the design for v1.0 of GoFlow library. The main goal is to make the library itself less complicated and make effort from zero to a real app minimal.

Steps

  • Updated basic Components and Graphs in pure Go
  • Updated component Registry and Factory
  • FBP DSL parser and loader written in GoFlow

Components

First, to reduce complexity, the following features are removed:

  • Sync/Async mode
  • Worker pool
  • State locks
  • Lifecycle management (initializer/shutdowner)

It is still be possible to implement all that functionality in the components themselves in pure Go code.

Structure

The structure definition of the components will be simplified:

type ExampleComponent struct {
	Foo <-chan int  // inport
	Bar chan<- int  // outport
}

Process

Reactive port handlers are removed in favour of more generic Process() function, defining the whole behaviour of the component.

So, long-running ("looper") components can be implemented like this:

func (c *ExampleComponent) Process() {
	for f := range c.Foo {
		// Do something
		c.Bar <- data
	}
}

A short-running ("non-looper") process would just return immediately after reading the input:

func (c *ExampleComponent) Process() {
	f := <-c.Foo
	// Do something
	c.Bar <- data
}

Graphs

API

The API for working with Graphs remains the same as before.

Underlying code is to be refactored, but no dramatic changes are expected. Yes, it will still use reflection to connect nodes in the network - because that brings a lot of convenience and run-time capabilities and only affects performance upon application start.

FBP files

One of the goals for v1 is to use FBP DSL as primary way to define graphs. So, instead of many lines of Go code, you could write something like this:

INPORT=Server.Port:Port
OUTPORT=Logger.Output:Log

Server(http/Server) Req -> Req Handler(app/Handler) Err -> Logger(log/Logger)
Middleware(http/Middleware) Funcs -> Middleware Server

Support for FBP files is built into runtime, so it can be called like:

n, err := flow.LoadGraph("app/main.fbp")

port := make(chan int)
n.SetInPort("Port", port)

wait := flow.Run(n)

port <- 8000

Runtime

For v1.0, the runtime should be powerful enough just to load and run .fbp graphs.

Support for FBP Protocol and Flowhub is to be added later. Though, existing work such as compatibility with JSON graph format will be kept to make future integration easier.

Handlers on a Looper are never called

A Looper can have inports, and handler functions can be defined for them. I think they should actually be handled. This could easily be fixed by moving the looper functionality here to its own gorutine that runs along side the normal handler loop.

Active (classical) FBP components

Currently GoFlow components are reactive: we program them as reactions to data arriving at inports.

In classical FBP, a component's process is sitting in its own thread receiving data on its inports and sending output in a loop.

Implementing active components in GoFlow is relatively easy but requires some changes to the library:

  • Active components should have Loop() method which is called in a separate goroutine when the process is activated.
  • Within the Loop() method it uses Go channel operators (<-, = <- and select) to receive data from inports, send data to outports, etc.
  • Hybrid active/reactive components are currently unsupported.
  • All the other GoFlow things like Graphs, Connections and run-time capabilities don't require any changes to support Active components.
  • For Loopers Loop() is called when the network is started. Non-loopers are currently unsupported in favor of reactive components.

Not getting all output, and output order not preserved

Hello Vladimir!

Don't know if this is merely support request, or a possible bug ...

I'm getting some problems with the following code: https://gist.github.com/samuell/6164115
... in combination with a few simple components from https://github.com/samuell/blow

... and using this file as input: https://gist.github.com/samuell/6164135 (just has to lie in the same folder), which is here of a very simple format, to make it easy to spot errors: One line "AAAA...", and the other line "CCC..." and so on, for 99 lines.

The program is supposed to simply convert 'A' -> 'T', 'T' -> 'A', 'C' -> 'G' and 'G' -> 'C' (To get the "complementary" DNA base sequence), back and fourth 4 times, which should finally give back the same result as the input file.

I get very strange results.

Problem nr 1, I sometimes don't get all the output back. When I run the compiled program and pipes it directly to "wc", I DO get 99 lines, which is the same as the input file:

[samuel basecompl]$ ./basecompl_blow|wc -l
99

But if I pipe the output to a separate file, and then counts the lines, I get a varying number of lines of output ... like 63, 73, 88, etc. Seems like the output printing does not have time to finish before the pipe is closed?

[samuel basecompl]$ ./basecompl_blow > out.txt 
[samuel basecompl]$ wc -l out.txt 
68 out.txt

Problem nr 2 is that the output is not in the same order as the input. While the input looks like:

AAA...
CCC...
AAA...
CCC...
... and so on ..

.. the output has changed the order of the lines, in an irregular way. See here for an example: https://gist.github.com/samuell/6164186

IIP support in networks

To make GoFlow compliant with NoFlo and to implement #7 we need to add a concept of "Initial IPs" directly into flow.Graph. So that when graph is loaded, information about initial packets is added to the graph, and when the network is run, those packets are sent automatically.

We didn't need it earlier because all packets were sent explicitly from Go code, now we need to support packets defined in graph's JSON definition.

Wait for current input processing to be finished before closing ports

Currently components are terminated immediately as soon as their inports are closed. But some input can still be in processing after the output is closed, this leads to "send on a closed channel" errors.

Additional synchronization should be added so that when inputs are closed, the component doesn't accept any input anymore, waits until running input handlers finish and closes outports afterwards.

Sync mode of components with multiple inputs

Currently sync mode only affects components with a single input. Multiple inputs are processed in separate goroutines, so they are still async to each other.

Enforcing a StateLock mutex on all sync components would fix data races, but it wouldn't fix the "events order" problem, i.e. it doesn't guarantee the correct sequence of invocation.

It could be fixed by using a traditional Go's switch on multiple channels, but it is impossible because the list of channel variables isn't predefined.

Explicit shutdown methods

Currently the networks are shut down with the downstream flow: closing an input of a process makes it close its outputs (after it finishes processing current data), closing inputs for the downstream process and so on until all processes in the network are stopped.

Some networks don't have input ports, they are started with IIPs and continue running until the application receives termination signal. Or in some cases the network has to be shut down as soon as possible without knowing about its inputs.

So, 2 new methods are proposed:

  • Graph.Stop() to shut the network down gracefully.
  • Graph.Kill() to terminate it urgently loosing all data that is still inside the network.

I also have a suspicion that the current "downstream shutdown" strategy might cause races in complex graphs with multiple merging branches. So this should be studied and probably alternative strategies should be considered.

Closing Component channels that are not explicitly marked as ports

Somewhere under the hood, it seems as though goflow is calling close on any node in its graph if that node has a Out channel field, even if that port is not connected in the graph. For example,

package main

import (
	"log"

	"github.com/trustmaster/goflow"
)

type Receiver struct {
	name string
	In   <-chan []byte
	Out  chan<- []byte
}

func NewReceiver(name string) *Receiver {
	return &Receiver{name: name}
}

func (db *Receiver) Process() {
	for in := range db.In {
		log.Printf("Processing Receiver for node %s", db.name)
		db.Out <- in
	}
}

type DataBuffer struct {
	name string
	In   <-chan []byte
	Out  chan<- []byte
}

func NewDataBuffer(name string) *DataBuffer {
	return &DataBuffer{name: name}
}

func (db *DataBuffer) Process() {
	for in := range db.In {
		log.Printf("Processing for node %s: %v", db.name, in)
	}
}


func main() {
	g := goflow.NewGraph()

	g.Add("greeter", new(Receiver))
	g.Add("printer", new(DataBuffer))

	g.Connect("greeter", "Out", "printer", "In")

	g.MapInPort("In", "greeter", "In")

	in := make(chan []byte)

	g.SetInPort("In", in)

	wait := goflow.Run(g)

	in <- []byte{1, 0}

	close(in)

	<-wait
}

outputs

2009/11/10 23:00:00 Processing Receiver for node 
2009/11/10 23:00:00 Processing for node : [1 0]
panic: close of nil channel

Changing the name of DataBuffer.Out to DataBuffer.LeaveMeAlone Making the field private (or removing it) fixes the issue.

I'm new to goflow (looks neat!) so maybe this is intended behaviour. From a newbie perspective it is unexpected.

Run-time component instantiation

In order to implement #7 with loading graph at run-time, it is required that new processes are created at run-time without having to write and compile Go code like this:

sorter := foo.NewSorter()
printer := foo.NewPrinter()
net.Add(sorter, "sorter")
net.Add(printer, "printer")

What we need is something like a component registry and a run-time process factory, that would spawn a new process given just its string component name, e.g.:

net.Add("foo.Sorter", "sorter")
net.Add("foo.Printer", "printer")

Routing

Hey guys. Just started using GoFlow. Great work!

Wondering if it's possible to implement a component which will have multiple conditional outputs? Assuming that condition handling is done by component.

I'm completely new to FBP and relatively new to Golang. So by reading goflow sourcecode I'm losing the big picture.

TestInternalConnectionIIP test panics randomly: IIP write to a closed channel

TestInternalConnectionIIP panics when the IIP happens to be concurrently sent after the graph is already being shut down and the input channel is already closed.

=== RUN   TestInternalConnectionIIP
--- FAIL: TestInternalConnectionIIP (0.00s)
panic: send on closed channel

goroutine 142 [running]:
reflect.chansend(0xc0001ab140, 0xc000126388, 0x0, 0x569c7b)
	/usr/local/go/src/runtime/chan.go:665 +0x4b
reflect.Value.send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x0, 0x596920)
	/usr/local/go/src/reflect/value.go:1524 +0x118
reflect.Value.Send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82)
	/usr/local/go/src/reflect/value.go:1506 +0x90
github.com/trustmaster/goflow.(*Graph).sendIIPs.func1(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x52bf00)
	/home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:78 +0x61
created by github.com/trustmaster/goflow.(*Graph).sendIIPs
	/home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:77 +0xf3
exit status 2
FAIL	github.com/trustmaster/goflow	0.008s

The root cause of this issue boils down to the fact that goroutine A (spawned by sendIIPs()) attempts sending to a channel concurrently closed by goroutine B (executing echo component 1: e1). The chan close is triggered by closing the graph in port: close(in) in a goroutine spawned by TestInternalConnectionIIP().

There is no quick and simple fix for this since it's architecturally inaccurate to just close channels from one of multiple writing goroutines. There is a lot of material around this on the net, for example, good read here.

Thus, I'd like to open a conversation around this to get understanding of the requirements. For example:

  1. Other than IIPs, do we envision multiple writers to a single input channel in a graph?
  2. Why do we need IIPs to be sent concurrently rather than synchronously at the graph start?

Improve error handling

Make all functions return error objects rather than bool and panics. Provide error messages which can be displayed e.g. in Flowhub rather than just console.

IIP in Component InPort results in uninitialized Channel and deadlocks.

If I have a Component with a port and add an Initial Information Package to it without making the port an Inport for the network, the underlying channel does not get initialized.

In component.go the check for the channel not being nil will be true and therefore the recv handler does not get initialized. This results in a deadlock of threads.

I hope this is descriptive enough.

Array ports

FBP doesn't allow connecting a single output to multiple inputs, though it allows for explicit replicator components in the network.

Currently it isn't very easy to write components operating on generic data in Go without writing too many type assertions, so generic components would require a code generator tool.

Though, an idea of broadcast ports replicating data to multiple subscribers is worth considering.

Feedback needed

Hello fellow Gophers!

I apologise as this project slipped out of my scope for several years.

I still have some ideas and plans of maintaining it, but I need some feedback on how it is going to be used by people who actually tried it. So, I would really appreciate your answers to the following questions in this thread or in any other form (e.g. via email, please find the address in my profile).

Questions:

  1. What kind of application did you use GoFlow for? (E.g. bioinformatics, ETL, web backend, IoT, etc.)
  2. Did you use it for personal, business, or academic purposes?
  3. Do you prefer working with graphs in visual or text form?
  4. Which visual tools have you used and which ones do you prefer?
  5. Do you prefer a Component to have main Loop(), or do you prefer setting up handler functions on each port (OnPortName())?
  6. Do you prefer processes to stay resident in memory or started and stopped on demand?
  7. Have you ever used State Locks, Synchronous Mode, Worker Pool, or other advanced features of GoFlow?
  8. Please tell me what you liked about GoFlow and what you would like to be added or changed.

Why this is important

As you might have noticed, this codebase is a bit dated. In fact, it was written in 2011 and didn't change much ever since. My own views on how an FBP library should work have changed over time. So, I think this library deserves a rewrite.

My views can be similar or different from yours, while I'm not building this library only for myself. That's why feedback is appreciated so much.

Thank you for participating!

Support concurrency limits inside Components

It is sometimes desirable to set the number of active goroutines inside a Component. One common example would be to create a pool of workers to fetch items from a database, because too many active requests would cause a decline in overall throughput.

This issue is for discussing how to best solve the problem for both async and sync modes.

Note that it would be valuable to make the concurrency level adjustable. For example, if the network connection used by the Component becomes saturated, I'd like to tell the Component to reduce its active goroutine count.

Races, lots of races

Output of go test -v -race, I can't even get the tests to finish executing.

=== RUN   TestSingleInput
--- PASS: TestSingleInput (0.00s)
=== RUN   TestStateLock
--- PASS: TestStateLock (0.12s)
=== RUN   TestSyncLock
--- PASS: TestSyncLock (0.03s)
=== RUN   TestInitFinish
--- PASS: TestInitFinish (0.00s)
=== RUN   TestClose
--- PASS: TestClose (0.00s)
=== RUN   TestShutdown
--- PASS: TestShutdown (0.00s)
=== RUN   TestPoolMode
--- PASS: TestPoolMode (0.00s)
=== RUN   TestStopProc
==================
WARNING: DATA RACE
Write at 0x00c42023c5d0 by goroutine 178:
  reflect.Value.SetBool()
      /usr/local/go/src/reflect/value.go:1364 +0x60
  github.com/abferm/goflow.RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:322 +0x18c8
  github.com/abferm/goflow.TestStopProc()
      /home/aferm/go/src/github.com/abferm/goflow/component_test.go:381 +0x2ae
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Previous read at 0x00c42023c5d0 by goroutine 152:
  reflect.Value.Bool()
      /usr/local/go/src/reflect/value.go:248 +0x52
  github.com/abferm/goflow.RunProc.func5()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:234 +0x14a
  github.com/abferm/goflow.RunProc.func8()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:331 +0x54

Goroutine 178 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:697 +0x543
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:882 +0xaa
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
  testing.runTests()
      /usr/local/go/src/testing/testing.go:888 +0x4e0
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:822 +0x1c3
  main.main()
      github.com/abferm/goflow/_test/_testmain.go:88 +0x20f

Goroutine 152 (finished) created at:
  github.com/abferm/goflow.RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:340 +0x192e
  github.com/abferm/goflow.TestStopProc()
      /home/aferm/go/src/github.com/abferm/goflow/component_test.go:361 +0x13e
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
==================
--- FAIL: TestStopProc (0.00s)
	testing.go:610: race detected during execution of test
=== RUN   TestLooper
--- PASS: TestLooper (0.00s)
=== RUN   TestFactory
--- PASS: TestFactory (0.00s)
=== RUN   TestFactoryConnection
--- PASS: TestFactoryConnection (0.00s)
=== RUN   TestFactorySubgraph
--- PASS: TestFactorySubgraph (0.00s)
=== RUN   TestRuntimeNetwork
--- PASS: TestRuntimeNetwork (0.00s)
=== RUN   TestConnection
--- PASS: TestConnection (0.00s)
=== RUN   TestComposite
--- PASS: TestComposite (0.00s)
=== RUN   TestMultiOutChannel
--- PASS: TestMultiOutChannel (0.00s)
=== RUN   TestIIP
--- PASS: TestIIP (0.00s)
=== RUN   TestStopNet
==================
WARNING: DATA RACE
Read at 0x00c4200b2c50 by goroutine 120:
  github.com/abferm/goflow.(*Graph).Stop()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:771 +0x57
  github.com/abferm/goflow.TestStopNet()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:391 +0x1d6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Previous write at 0x00c4200b2c50 by goroutine 78:
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:651 +0x26c
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c

Goroutine 120 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:697 +0x543
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:882 +0xaa
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
  testing.runTests()
      /usr/local/go/src/testing/testing.go:888 +0x4e0
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:822 +0x1c3
  main.main()
      github.com/abferm/goflow/_test/_testmain.go:88 +0x20f

Goroutine 78 (running) created at:
  github.com/abferm/goflow.RunNet()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
  github.com/abferm/goflow.TestStopNet()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:380 +0x123
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
==================
==================
WARNING: DATA RACE
Read at 0x00c42023ccf0 by goroutine 83:
  reflect.Value.Bool()
      /usr/local/go/src/reflect/value.go:248 +0x52
  github.com/abferm/goflow.RunProc.func3()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:181 +0xc3
  github.com/abferm/goflow.RunProc.func7()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:299 +0x375

Previous write at 0x00c42023ccf0 by goroutine 78:
  reflect.Value.SetBool()
      /usr/local/go/src/reflect/value.go:1364 +0x60
  github.com/abferm/goflow.RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:322 +0x18c8
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c

Goroutine 83 (running) created at:
  github.com/abferm/goflow.RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:317 +0x19de
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c

Goroutine 78 (running) created at:
  github.com/abferm/goflow.RunNet()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
  github.com/abferm/goflow.TestStopNet()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:380 +0x123
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
==================
--- FAIL: TestStopNet (0.00s)
	testing.go:610: race detected during execution of test
=== RUN   TestReconnection
==================
WARNING: DATA RACE
Write at 0x00c42012c290 by goroutine 251:
  reflect.Value.Set()
      /usr/local/go/src/reflect/value.go:1355 +0x109
  github.com/abferm/goflow.unsetProcPort()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:465 +0x244
  github.com/abferm/goflow.(*Graph).Disconnect()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:482 +0x1eb
  github.com/abferm/goflow.TestReconnection()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:436 +0x9b6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Previous read at 0x00c42012c290 by goroutine 257:
  reflect.Value.pointer()
      /usr/local/go/src/reflect/value.go:90 +0x85
  reflect.Select()
      /usr/local/go/src/reflect/value.go:2016 +0x87c
  github.com/abferm/goflow.RunProc.func7()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:295 +0xa4

Goroutine 251 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:697 +0x543
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:882 +0xaa
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
  testing.runTests()
      /usr/local/go/src/testing/testing.go:888 +0x4e0
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:822 +0x1c3
  main.main()
      github.com/abferm/goflow/_test/_testmain.go:88 +0x20f

Goroutine 257 (running) created at:
  github.com/abferm/goflow.RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/component.go:317 +0x19de
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
==================
==================
WARNING: DATA RACE
Read at 0x00c42012c180 by goroutine 253:
  runtime.mapiternext()
      /usr/local/go/src/runtime/hashmap.go:730 +0x0
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:642 +0x238
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c

Previous write at 0x00c42012c180 by goroutine 251:
  runtime.mapassign()
      /usr/local/go/src/runtime/hashmap.go:485 +0x0
  github.com/abferm/goflow.(*Graph).Add()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:191 +0x2a0
  github.com/abferm/goflow.TestReconnection()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:441 +0xa0f
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Goroutine 253 (running) created at:
  github.com/abferm/goflow.RunNet()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
  github.com/abferm/goflow.TestReconnection()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:420 +0x82c
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Goroutine 251 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:697 +0x543
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:882 +0xaa
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
  testing.runTests()
      /usr/local/go/src/testing/testing.go:888 +0x4e0
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:822 +0x1c3
  main.main()
      github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
==================
==================
WARNING: DATA RACE
Write at 0x00c420314080 by goroutine 253:
  github.com/abferm/goflow.(*Graph).run()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:651 +0x26c
  github.com/abferm/goflow.RunNet.func1()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c

Previous read at 0x00c420314080 by goroutine 251:
  github.com/abferm/goflow.(*Graph).RunProc()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:748 +0x4c
  github.com/abferm/goflow.TestReconnection()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:451 +0xb50
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Goroutine 253 (running) created at:
  github.com/abferm/goflow.RunNet()
      /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
  github.com/abferm/goflow.TestReconnection()
      /home/aferm/go/src/github.com/abferm/goflow/network_test.go:420 +0x82c
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107

Goroutine 251 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:697 +0x543
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:882 +0xaa
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:657 +0x107
  testing.runTests()
      /usr/local/go/src/testing/testing.go:888 +0x4e0
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:822 +0x1c3
  main.main()
      github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
==================

Tests failing with Go 1.7+

$ go test
panic: send on closed channel

goroutine 1033 [running]:
github.com/trustmaster/goflow.(*counter).Loop(0xc4201a1680)
	/Users/trustmaster/go/src/github.com/trustmaster/goflow/component_test.go:426 +0x1c0
github.com/trustmaster/goflow.RunProc.func7(0x101, 0xc420075a10, 0xc420314250, 0x250c1f0, 0xc4201a1680, 0xc4200f90e0, 0xc4200759e0, 0xc4200f9100, 0x0, 0xc4200759b0)
	/Users/trustmaster/go/src/github.com/trustmaster/goflow/component.go:283 +0x2d8
created by github.com/trustmaster/goflow.RunProc
	/Users/trustmaster/go/src/github.com/trustmaster/goflow/component.go:309 +0x1408
exit status 2
FAIL	github.com/trustmaster/goflow	0.040s

Remote process communication

Once we have a framework to turn a visual graph into a running app, the next step is to let different flow-based apps talk to each other by means of remote ports and connections.

It would be nice to make such communication compatible with NoFlo network protocol noflo/noflo#107, so NoFlo graphs could talk to GoFlow and vice versa.

flow performance

I try use flow write a simple demo,then test it's performance.it too many time cost in runtime.pthread_cond_signal.i try to use flow.RunProc() or set DefaultComponentMode,but can not find this function and field

Multiple Inputs connecting to same Output

Hey!

I recall reading somewhere that this doesn't support multiple connections to the same connector, but is this something that could be made possible?

I'm making a node system that will enable connecting several different inputs to the same output, so this is why I'm wondering.

Thanks!

Main graph support in runtime

NoFlo UI talks to the application's main graph via runtime protocol. GoFlow currently creates the main graph in plain Go, but we could add support for the main graph in the runtime part.

Feature request: Broadcast from one proc port to n proc ports

It would be really helpful to have a specialized Connect function which
connects the output port of one component to n input ports.
Something like:

func (n *Graph) ConnectBroadcast(senderName, senderPort, clientPorts ...[]string) bool

where clientPorts are tuples of proc names & ports

As far as I can see there is no easy way to have a broadcast component in goflow without such a function.

Supporting multiple IIPs in one network?

I'm trying to execute the following network:

'./sample.csv' -> FILE Source(csv/reader)
'./output.csv' -> FILE Target(csv/writer)
Source DATA -> DATA Target

But for some reason the network hangs and 2nd IIP never fired:

00:48:00 INF 1 run.go:29 > file="examples/simple.fbp" reading flow file
00:48:00 DBG 1 run.go:70 > IIP="'./sample.csv' -> FILE Source" adding IIP
00:48:00 DBG 1 run.go:70 > IIP="'./output.csv' -> FILE Target" adding IIP
00:48:00 DBG 10 csv.go:15 > CSVReader.Process()
00:48:00 DBG 12 csv.go:30 > CSVWriter.Process()
00:48:03 DBG 10 csv.go:18 > file="./sample.csv" CSVReader.Process()
00:48:03 DBG 12 csv.go:41 > file="" data="demo_table" CSVWriter.Process() 
00:48:03 DBG 10 csv.go:21 > CSVReader.Process() shutting down
...

And execution keeps waiting...

Expectation is to see the log line where both file and data variables are provided.

The components code is provided below:

package components

import (
	"time"

	"github.com/phuslu/log"
)

type CSVReader struct {
	File <-chan string
	Data chan<- string
}

func (reader *CSVReader) Process() {
	log.Debug().Msg("CSVReader.Process()")
	for file := range reader.File {
		time.Sleep(time.Second * 3)
		log.Debug().Str("file", file).Msg("CSVReader.Process()")
		reader.Data <- "demo_table"
	}
	log.Debug().Msg("CSVReader.Process() shutting down")
}

type CSVWriter struct {
	File <-chan string
	Data <-chan string
}

func (writer *CSVWriter) Process() {
	log.Debug().Msg("CSVWriter.Process()")
	defer func() {
		log.Debug().Msg("CSVWriter.Process() shutting down")
	}()
	var (
		file, table string
	)
	for {
		select {
		case file = <-writer.File:
			if table != "" {
				log.Debug().Str("file", file).Str("data", table).Msg("CSVWriter.Process() ")
				file = ""
				table = ""
			}
		case table = <-writer.Data:
			if file != "" {
				log.Debug().Str("file", file).Str("data", table).Msg("CSVWriter.Process() ")
				file = ""
				table = ""
			}
		}
	}

}

Manipulate a network in running

@trustmaster

Hello, I would like to know how to dynamically manipulate a network in running.

I tested with a simple code below. However inserting a new process into the network already initialized doesn't change after the network running.
I am not sure whether this is a nature of design for flow based programming.

In addition, I would like to hear whether implementing such dynamic manipulating a network is meaningful or not.

type SrcTest struct {
	Out chan<- byte
}

type FilterTest1 struct {
	In <-chan byte
	Out chan<- byte
}

type FilterTest2 struct {
	In <-chan byte
	Out chan<- byte
}

type SinkTest struct {
	In <-chan byte
}

func (s *SrcTest) Process() {
	for {
		s.Out <- 'a'
		time.Sleep(100 * time.Millisecond)
	}
}

func (f *FilterTest1) Process() {
	for in := range f.In {
		_ = in
		f.Out <- 'b'
	}
}

func (f *FilterTest2) Process() {
	for in := range f.In {
		_ = in
		log.Infof("f2 f2")
		f.Out <- 'c'
	}
}

func (s *SinkTest) Process() {
	for in := range s.In {
		log.Infof("recv: %c", in)
	}
}

func TestStreamData(t *testing.T) {
	n := goflow.NewGraph()
	n.Add("src", new(SrcTest))
	n.Add("f1", new(FilterTest1))
	n.Add("sink", new(SinkTest))
	n.Connect("src", "Out", "f1", "In")
	n.Connect("f1", "Out", "sink", "In")

	wait := goflow.Run(n)
	go func() {
		time.Sleep(2 * time.Second)
		log.Infof("insert f2")


		n.Add("f2", new(FilterTest2))
		//n.Connect("f1", "Out", "f2", "In")
		n.Connect("src", "Out", "f2", "In")
		n.Connect("f2", "Out", "sink", "In")

	}()

	<-wait
}

Some code makes me confused

Thank you for your code. But when I read it, the code
https://github.com/trustmaster/goflow/blob/master/network.go#L187
https://github.com/trustmaster/goflow/blob/master/network.go#L188
makes confused.
At line 180 or 194 in the same function, the variable ’vNet‘ has been assigned. and why assign ’vNet‘ the value of 'n'?
What's more, 'vNet' is a left value and never be used again. So why we need 'vNet'?
It really makes me confused.

I will appreciate it if you help me.

Thank you anyway.

Run-time network manipulation

Another aspect of manipulating graphs at run-time required for #7 is being able to reference and manipulate nodes in sub-graphs at run-time, e.g.:

app.AddGraph("feedGrabber")
app.Get("feedGrabber").Add("web.UrlReader", "reader")
app.Get("feedGrabber").Add("web.RssParser", "parser")
app.Get("feedGrabber").Connect("reader", "Data", "parser", "XML")
app.Get("feedGrabber").MapInPort("In", "reader", "URL")
app.Get("feedGrabber").MapOutPort("Out", "parser", "Entry")
app.Connect("crawler", "Link", "feedGrabber", "In")
app.Connect("feedGrabber", "Out", "storage", "FeedIn")
app.Run("feedGrabber")

It is also important to take into consideration how running processes react on getting connected to new nodes and getting disconnecting from nodes.

Automatic channel creation on network Connect()

What annoyed me originally with GoFlow is that you need to create a channel manually every time you connect 2 processes in a graph:

net.Connect("src", "Out", "dst", "In", make(chan string))

It would be better if GoFlow inferred the channel type using reflection and created it on its own, so the above code could be simplified just to this:

net.Connect("src", "Out", "dst", "In")

Back in 2012 this was not possible because Go's reflect didn't let you create a bidirectional channel out of a unidirectional. Nowadays it is possible, so we can give it another try.

How to wait for two inputs?

Hello Vlad!

Great to see work going on, on GoFlow! 👍

I started looking at it again for some recent use cases, but had one question: How would I implement a component that waits for two inputs, before it does its job? The default way seems to be to implement a On<InportName>() function, but so I would need something that does On<Inport1><Inport2>() :) ... but I'm sure there is a better way for this?

Best
// Samuel

Component with multiple inputs not stopped

I discovered that a component with 2 (or more) input channels is not stopped properly, even though all input channels are closed.

I also created a test in this gist.

In the test I create a simple net:

------
| C1 | \    ------
------  \-> |    |
            | C2 |
------  /-> |    |
| C1 | /    ------
------

After closing both In ports I would expect that the whole net will be stopped, but the C2 component actually keeps on running.

Am I doing something wrong, or is this a bug?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.