Streams and Pipelines in EK9

This pipelining concept has been touched upon in the Introduction, please read that section first as this section has much more detail.

Concept

The concept of a pipeline process is present in the Unix shell and also programming languages with chained methods (like filter, map and reduce). In the EK9 language the idea is to 'stream' zero or more Objects through a notional sequence of commands.

During the streaming, some commands may be configured to prevent further passage of Objects that have specific properties or characteristics.

But importantly as an Object passes through these commands; it can also be 'transformed' into a different type. This may mean the data held is enriched; or simplified.

Finally the Objects (if any) emerging from the pipeline can be used in some way.

So there are three discrete phases to this streaming:

Starting the stream

The EK9 language has two different ways to catenate Objects.

Just these two mechanisms are all that is needed to start a Stream pipeline. Any type of Object can be sent through the pipeline, this includes function delegates. This is very important if you wish to do any sort of 'asynchronous' processing.

Pipeline processing

The streaming mechanism uses the pipe '|' symbol for joining parts of the pipeline just like Unix; and it includes:

But also adds:

Collection or Reduction

At the end of the pipeline zero or more Objects will flow out. A decision of what to do with these Objects must be taken. You can do one of three things with them.

When you are developing your own records and classes you can provide your own '|' operator implementation and then your types can be used as the terminal ('collector'/'sink') in a stream pipeline.

If you are familiar with Unix/Linux this type of syntax will be second nature to you.

Examples

So as to aid understanding; there are a number of inline examples below. Following each of these there is an explanation of how the code works.

There are some examples elsewhere on this site that show join, call and async. The '>>' is shown in the duration/date example.

For a 'reducing' or 'accumulating' example; see Integer pipeline.

If you have a need to stream Objects through a pipeline and then at the final stage only keep the final N of them based on some weighting criteria; then consider using a priority queue configured to a finite size.

The Preamble and setup

The source code below uses an example of 'Books' and 'Authors' and demonstrates the rest of the stream pipeline commands. It introduces the concept of a library of books that have been published. It then uses the stream pipeline functionality to extract information about the books. The first few examples are trivial; but the latter build up quite a significant processing pipeline.

The same setup EK9 source code will be used for all the examples, it is shown here once for brevity.

#!ek9
defines module introduction

  defines type

    AuthorId as Integer constrain
      > 0

    Age as Integer constrain
      > 0

    Name as String constrain as
      matches /^[a-zA-Z -]+$/

    BookTitle as String constrain as
      matches /^[a-zA-Z0-9 -+]+$/

    <?-
      Simple enumeration for controlling filtering
    -?>
    BookFilterSelection
      SkipTwo
      JustFirst
      JustLast

  defines class

    <?-
      Models the author of books.
    -?>
    Author
      id as AuthorId?
      age as Age?
      firstname as Name?
      surname as Name?

      Author() as pure
        ->
          id as AuthorId
          age as Age
          firstname as Name
          surname as Name
        assert id? and age? and firstname? and surname?
        this.id:=? AuthorId(id)
        this.age:=? Age(age)
        this.firstname:=? Name(firstname)
        this.surname:=? Name(surname)

      Author() as pure
        -> author as Author
        this(author.id(), author.age(), author.firstname(), author.surname())

      <?-
        Stop creation of empty Author.
      -?>
      default private Author() as pure

      id() as pure
        <- rtn as AuthorId: id

      age() as pure
        <- rtn as Age: age

      firstname() as pure
        <- rtn as Name: firstname

      surname() as pure
        <- rtn as Name: surname

      operator #? as pure
        <- rtn as Integer: #?id

      operator $ as pure
        <- rtn as String: `Author: ${firstname} ${surname}, Age: ${age}`

      operator ? as pure
        <- rtn as Boolean: age? and firstname? and surname?

      operator :=:
        -> author as Author
        id :=: author.id()
        age :=: author.age()
        firstname :=: author.firstname()
        surname :=: author.surname()

    Book
      title as BookTitle?
      author as Author?
      published as Date?

      Book() as pure
        ->
          title as BookTitle
          author as Author
          published as Date
        assert title? and author? and published?
        this.title:=? BookTitle(title)
        this.author:=? Author(author)
        this.published:=? Date(published)

      author() as pure
        <- rtn as Author: author

      published() as pure
        <- rtn as Date: published

      operator $ as pure
        <- rtn as String: `Title: ${title}, ${author}, Published: ${published}`

      operator #? as pure
        <- rtn as Integer: #? $this

      operator #^ as pure
        <- rtn as String: $this

      operator <=> as pure
        -> book as Book
        <- rtn as Integer: published() <=> book.published()

    <?-
      Model the concept of a library, just an in-memory map for now
    -?>
    Library
      books as List of Book?

      Library() as pure
        -> withBooks as List of Book
        books :=? withBooks

      Library() as pure
        this([
          Book(BookTitle("Java"), Author(AuthorId(1), Age(50), Name("John"), Name("Doe")), 1998-01-01),
          Book(BookTitle("C++"), Author(AuthorId(1), Age(42), Name("John"), Name("Doe")), 1990-01-07),
          Book(BookTitle("Scala"), Author(AuthorId(1), Age(67), Name("John"), Name("Doe")), 2015-03-02),
          Book(BookTitle("Python"), Author(AuthorId(1), Age(62), Name("John"), Name("Doe")), 2010-12-02),
          Book(BookTitle("HTML"), Author(AuthorId(2),Age(58), Name("Mark"), Name("Pickford")), 2008-07-02),
          Book(BookTitle("CSS"), Author(AuthorId(4), Age(51), Name("Mark"), Name("Keely")), 2008-04-02),
          Book(BookTitle("ADA"), Author(AuthorId(5), Age(44), Name("Ada"), Name("Lovelace")), 1988-01-02),
          Book(BookTitle("Dart"), Author(AuthorId(6), Age(47), Name("Peter"), Name("Dove")), 2020-01-02),
          Book(BookTitle("C#"), Author(AuthorId(7), Age(60), Name("William"), Name("Fence")), 2012-10-02),
          Book(BookTitle("Javascript"), Author(AuthorId(3), Age(52), Name("James"), Name("Pickford")), 2008-03-02),
          Book(BookTitle("C"), Author(AuthorId(1), Age(42), Name("John"), Name("Doe")), 1990-01-07),
          Book(BookTitle("C++"), Author(AuthorId(7), Age(38), Name("William"), Name("Fence")), 1990-04-02),
          Book(BookTitle("C"), Author(AuthorId(7), Age(38), Name("William"), Name("Fence")), 1990-04-14),
          Book(BookTitle("Haskell"), Author(AuthorId(7), Age(30), Name("William"), Name("Fence")), 1982-04-14),
          Book(BookTitle("Lisp"), Author(AuthorId(7), Age(25), Name("William"), Name("Fence")), 1977-09-24)
          ])

      <?-
        While you would never really 'iterate' through all the books in a library - this
        just enables the demonstration of streaming.
        Clearly you would normally have multiple maps/indexes etc to locate books via a search
        mechanism.
      -?>
      iterator() as pure
        <- rtn as Iterator of Book: books.iterator()

      operator + as pure
        -> book as Book
        <- rtn as Library: Library(books + book)

    

The Model

The model is quite simple, a few types (note use of simple strong typing), a couple of classes for Books, Authors and the Library.
As this is a stream pipeline example and more functional the pure key word has been used to highlight immutability (actually it's more than a highlight - the compiler enforces it). Strictly speaking clones/copies of properties should have been taken when returning values with accessor methods. As this would then prevent any alterations being made to Objects provided or returned.

The Programs

The first program is the most simple and just outputs all the books to 'Standard Out'

Program 1

...
  defines program

    <?-
      Just send all the books in the library out to stdout.
      Because Library has an iterator method 'cat' will use it to get each book.
    -?>
    JustCatBooks()
      stdout <- Stdout()
      library <- Library()

      cat library > stdout

//EOF

The output of the above program is as follows:

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24

The output is unsurprising, as no real processing has taken place. Note by providing the '#^' (promote) operator on the Book class; Objects of type Book can be sent straight to stdout as they get converted (promoted) to a String.

Program 2

This program just sorts the Books by Author ID and outputs all the books to 'Standard Out'

The use of sort is the first example of a stream pipeline command, it takes a single parameter. This parameter is the name of a function. It could have been anything that provided a function; this includes a call to a higher order function. It could also be a dynamic function. In this case a simple pure function has been used.

By building a library of short pure functions together with a set of records, traits and classes you can optimise your code reuse and importantly create a set of unit tests and examples of how they can be used.

...
  defines function

    comparingAuthor() as pure
      ->
        book1 as Book
        book2 as Book
      <-
        rtn as Integer: book1.author().id() <=> book2.author().id()

  defines program

    <?-
      Now sort the books before outputting.
      This uses a specific comparator.
    -?>
    SortBooksByAuthor()
      stdout <- Stdout()
      library <- Library()

      cat library | sort by comparingAuthor > stdout

//EOF

The output of the program is as follows

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24

The output is ordered as you would expect by Author ID.

Program 3

John Doe has two books published on the same date (1990-01-07), this next example now groups books by the same author if they were published on the same date.

...
  defines function

    dateBookPublished() as pure
      -> book as Book
      <- rtn as Date: book.published()

  defines program

    <?-
      Example of grouping books by their published date and outputting firstly
      by that published date. But the authors will in effect by the ordering within that group.
      So as the stream is sorted by author they are passed through to grouping as a new 'published'
      date is encountered a new group is created and that book by the author is added.
      The 'group' intermediate operation finally outputs all the groups in the order they were created.
      This output is in the form of List of Book, hence the need to flatten the output.
    -?>
    GroupBooksByPublishedDate()
      stdout <- Stdout()
      library <- Library()
      cat library | sort by comparingAuthor | group by dateBookPublished | flatten > stdout

//EOF

The output of the program is as follows

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24

Now the books by John Doe are grouped together for example.

Program 4

This example program now orders the books by their published date, but leaves them ordered by author.

...
  defines function

    compareDatePublished() as pure
      ->
        book1 as Book
        book2 as Book
      <-
        rtn as Integer: book1.published() <=> book2.published()

    orderOnPublishedDate() as pure
      -> books as List of Book
      <- rtn as List of Book: cat books | sort by compareDatePublished | collect as List of Book

  defines program

    <?-
      This example outputs the books in author order, but also for each of those authors sorts their
      books by published date.
    -?>
    ProcessByAuthor()
      stdout <- Stdout()
      library <- Library()

      //This is the nearest ek9 gets to a lambda - it's a bit 'wordy'.
      //In effect we are creating a function delegate that is a 'Function' that accepts a 'Book' and
      //returns an 'AuthorId' type. It is pure in nature - it does not mutate the book in anyway.
      //The arguments of the 'Function' inferred from its definition and so can just be used in the parenthesis.

      authorId <- () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())

      cat library
        | sort by comparingAuthor
        | group by authorId
        | map by orderOnPublishedDate
        | flatten
        > stdout

//EOF

Now this is a little more interesting!
Again the list is ordered by Author ID first. But then the stream is group(ed) into Lists of Books by each specific author.

The next command in the pipeline is map, this uses a function orderOnPublishedDate. Note here that the Object being streamed through the pipeline has now become a 'List of Book', not just 'Book'. So function orderOnPublishedDate accepts a 'List of Book', it too now uses a Stream pipeline to order that list. It returns a totally new (but ordered) list, here you can see collect as being used - which is a stream pipeline terminal command.

Now we need a stream of 'Book' again and not a stream of 'List of Book', so just use the flatten (thank you origin?) to take the contents of each List and output the Books in sequence. Note that flatten can be used with Optional and Result as well.

One of the issues with Stream pipelines (and chaining methods in other languages) is 'knowing' what the type is at any point along the pipeline; as this type an be altered during processing. EK9 makes this easy because of strong and explicit typing of functions and methods.

For example:

Summary

This processing has been somewhat more sophisticated, but each function as a single simple purpose and they can and will be reused (and can be tested in isolation). Importantly the stream pipeline itself remains very readable, succinct and easy to understand. But there's no doubt this approach takes more thought to design and implement than an imperative loop with state variables!

The output of the program is as follows

Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02

As you can see the list of books is now in both author order and book publish date order (for that author).

Program 5

This example builds on the last one, but limits the output to authors with three or more books.

...
  defines function

    sufficientBooks() as pure
      -> books as List of Book
      <- rtn as Boolean: length books >= 3

  defines program
    <?-
      This example shows two things, firstly it inlines the dynamic function (not keen on this but it can be done).
      Then it selects/filters the lists that do not have sufficient books (in this case three or more).
      Only then does it order those books on their published date.
    -?>
    ProcessByAuthorWithThreeOrMoreBooks()
      stdout <- Stdout()
      library <- Library()

      cat library
        | sort by comparingAuthor
        | group by () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | flatten
        > stdout

//EOF

Because the pipeline has now become quite long, the alternative vertical layout has been employed. In general vertical layouts make code much more readable (though increase page length!).

This example uses the filter/select command to only allow a 'List of Book' where there are three or more books in the list.
You might argue that the 'predicate' of 'sufficient number of books' is just really an 'if statement' or a 'lambda'. But the point in EK9 is to capture business logic, by having a function called 'sufficientBooks' this is explicit.

The output of the program is as follows

Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02

Program 6

Again this example builds on the last, but just omits the first two books by authors that have three or more books published.

...

  defines program

    <?-
      Similar to the other examples, but this one just skips the first two books by
      each author (if they have 3 or more books).
      Note, there is a sort of common pattern to these, if an intermediate operation needs
      a function, you can either define a basic function or just create a dynamic function.
    -?>
    ProcessByAuthorWithThreeOrMoreBooksIgnoreFirstTwo()
      stdout <- Stdout()
      library <- Library()

      authorId <- () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())
      excludingFirstTwoBooks <- () is Function of(List of Book, List of Book) as pure function
        r:=? cat t | skip 2 | collect as List of Book

      cat library
        | sort by comparingAuthor
        | group by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map by excludingFirstTwoBooks
        | flatten
        > stdout

//EOF

The output of the program is as follows (for both examples above)

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02

The following two examples follow in the same theme, by showing just the first and just the last book by the author with three or more books. By introducing the dynamic function excludingFirstTwoBooks books can be skipped and ignored by specific authors.

The dynamic function excludingFirstTwoBooks could have been written as a standard function. Alternatively the dynamic function could have been returned through the use of a higher function.

Reuse

If you have a range of requirements that need different views/summaries or filters on a set of data; the development of a number of reusable functions as above demonstrates how you can maximize code reuse.

While it is possible to use an imperative 'loop' style with loop variables and intermediate collections built up during processing, none of this tends to be reusable as it is nested in the loops and is conditioned.

Clearly if you only have a single simple requirement to access the latest book published by author ID=1 you might be tempted by a simple loop. In general (depending on the longevity of the software) it almost inevitable additional queries will be required. By adopting the Stream pipeline approach early you may have to write slightly more code, but then it will be obvious how to extend it and more importantly reuse what you've already written. Moreover the design pattern is a known one rather than being some form of while, do-while or iterator loop. A short example of this is shown later.

Program 7

Below is an example of using an enumeration and a higher order function; this returns a function based on the enumeration value.

...
  defines function

    bookFilter() as pure abstract
      -> books as List of Book
      <- filtered as List of Book?

    <?-
      Example of a higher order function and dynamic functions.
      There are other ways to solve the mapping of an enumerations requirement to a function.
      This uses a switch/given expression in a pure context
    -?>
    suitableBookFilter() as pure
      -> selection as BookFilterSelection
      <- rtn as bookFilter: given selection
        <- theFilter as bookFilter?
        when BookFilterSelection.SkipTwo
          rtn:=? () is bookFilter as pure function (filtered:=? cat books | skip 2 | collect as List of Book)
        when BookFilterSelection.JustFirst
          rtn:=? () is bookFilter as pure function (filtered:=? cat books | head 1 | collect as List of Book)
        when BookFilterSelection.JustLast
          rtn:=? () is bookFilter as pure function (filtered:=? cat books | tail 1 | collect as List of Book)
        default
          rtn:=? () is bookFilter as pure function (filtered:=? t)

  defines program

    <?-
      Example of using a higher order function to create the book filter.
    -?>
    ProcessByAuthorUsingHigherOrderFunction()
      stdout <- Stdout()
      library <- Library()
      authorId <- () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())

      stdout.println("Omit first two books where author has three or more books")
      cat library
        | sort by comparingAuthor
        | group by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with suitableBookFilter(BookFilterSelection.SkipTwo)
        | flatten
        > stdout

      //EOF

The example above just shows how an enumeration can be used with a higher order function to 'switch' on the enumeration to return an appropriate dynamic function. From this example it's quite easy to see how the above pipeline code can be refactored again, so that it too can be wrapped into a function.

Program 8

This program is a simple refactoring of the one above, to highlight functional composition and how useful strong typing of functions and higher order functions can be.

...
  defines function

    filterBooksToOutput()
      ->
        library as Library
        filterSelection as BookFilterSelection
        output as StringOutput

      authorId <- () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())

      cat library
        | sort by comparingAuthor
        | group by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with suitableBookFilter(filterSelection)
        | flatten
        > output

  defines program

    <?-
      Next logical step is to extract the above and put it into a function called 'filterBooksToOutput'
      Then it can just be called with appropriate parameters.
    -?>
    ProcessByAuthorUsingParameterisedFunction()
      stdout <- Stdout()
      library <- Library()

      stdout.println("Omit first two books where author has three or more books")
      filterBooksToOutput(library, BookFilterSelection.SkipTwo, stdout)

      stdout.println("First book where author has three or more books")
      filterBooksToOutput(library, BookFilterSelection.JustFirst, stdout)

      stdout.println("Last book where author has three or more books")
      filterBooksToOutput(library, BookFilterSelection.JustLast, stdout)

//EOF

By mixing in simple (enumerated) types and the switch expression it is possible to see how a range of different solutions can be structured with a number of small reusable functions.

When linked with the polymorphism of traits, classes and importantly functions the whole solution is more flexible and fungible in terms of which actual filter functions are used and also where results are output.

By taking the time to create abstract functions, simple types and records with a select number of operators refactoring and functional composition is much easier.

The output is as you would expect; the same as the previous example.

Omit first two books where author has three or more books
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
First book where author has three or more books
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Last book where author has three or more books
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02

Program 9

This example is slightly different and demonstrates how parts of the processing pipeline can be tee'd off and saved to be used later. It also demonstrates how streamed items can be removed if duplicated by some key or other (in this case the publication date is used).

...
  defines function

    bookSigningEvent() as pure
      -> book as Book
      <- rtn as String: `Date: ${book.published()}`

  defines program

    UniquePublishingDatesFromAuthorWithThreeOrMoreBooks()
      stdout <- Stdout()
      library <- Library()
      authorId <- () is Function of (Book, AuthorId) as pure function (r:=? t.author().id())

      authorsBooks as List of Book: List()

      booksByEachAuthor as List of List of Book: List()

      stdout.println("Unique signing events on day of publication by authors with three or more books.")

      cat library
        | sort by comparingAuthor
        | group by authorId
        | tee in booksByEachAuthor
        | filter by sufficientBooks
        | map by orderOnPublishedDate
        | flatten
        | tee in authorsBooks
        | uniq by dateBookPublished
        | sort by compareDatePublished
        | map with bookSigningEvent
        > stdout

      stdout.println("Books From author with three or more books")
      cat authorsBooks > stdout

      stdout.println("There are " + $ length booksByEachAuthor + " authors in total with any number of books")

//EOF

The output of the program is as follows

Unique signing events on day of publication by authors with three or more books.
Date: 1977-09-24
Date: 1982-04-14
Date: 1990-01-07
Date: 1990-04-02
Date: 1990-04-14
Date: 1998-01-01
Date: 2010-12-02
Date: 2012-10-02
Date: 2015-03-02
Books From author with three or more books
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
There are 7 authors in total with any number of books

The interesting part in the example above is the multiple use of the tee command to take contents as they stream through the pipeline to be processed and store them in separate collections.

Limiting the output by the use of the uniq command on a specific field (in this case the function dateBookPublished uses the date the book was published as the uniqueness key).

Program 10 (The Final Example)

This example is totally different, it only uses book authors where their age is 50 or over (at the time of writing the book). It then ensures that the author is only included once, finally it sorts the authors by surname (and first name if their surname is the same) and outputs that name.

...
  defines function

    bookAuthor() as pure
      -> book as Book
      <- author as Author: book.author()

    acceptableAuthorAge() as pure
      -> author as Author
      <- rtn as Boolean: author.age() >= Age(50)

    authorName() as pure
      -> author as Author
      <- rtn as String: `${author.firstname()} ${author.surname()}`

    comparingAuthorName() as pure
      ->
        author1 as Author
        author2 as Author
      <-
        rtn as Integer: author1.surname() <=> author2.surname()
      if rtn == 0
        rtn: author1.firstname() <=> author2.firstname()

  defines program

    LibraryExample()
      stdout <- Stdout()
      library <- Library()

      cat library
        | map with bookAuthor
        | select by acceptableAuthorAge
        | uniq
        | sort by comparingAuthorName
        | map with authorName
        > stdout

//EOF

The output of the program is as follows

John Doe
William Fence
Mark Keely
James Pickford
Mark Pickford

Summary

The Stream pipeline functionality takes the most functional approach to software development, but when used with functions, traits, records, classes and collections removes the need for deep and layered nested loops with stateful variables.

It can also be tolerant of missing data if needs be. While not shown in these examples, if the published data had been unset there would have been no null pointer exceptions.

It can take some time for developers with an imperative or Object-Oriented background to become used to this approach. With many OO languages now including 'chained'/'fluent' API's; maybe the clean simple and composable syntax in EK9 should be easier to understand and adopt. The syntax has been chosen to be very similar to the Unix/Linux shell 'pipe' approach as the concept is so similar.

There is little doubt that this same functionality could have been written with much less code using an imperative style. EK9 does support this; but in general the smaller, more rounded and documented the components developed; then more reliable/reusable and easier to understand the code is over the longer term.

But probably more importantly - the design pattern is a standard one; it leaps off the page as to what is being done. This is very unlike nested arbitrary loops, these tend to need more time to 'grok'. Each time similar functionality is needed it can and probably will be implemented in slightly different ways (this can lead to confusion and defects).

In fact here is a partial procedural/imperative implementation without the author name sorting as a contrast.

...
  defines program
    ImperativeLibraryExample()
      stdout <- Stdout()
      library <- Library()

      uniqueAuthors as Dict of (AuthorId, Author): Dict()

      for book in library
        author <- book.author()
        if author.age() >= 50
          if uniqueAuthors not contains author.id()
            uniqueAuthors += DictEntry(author.id(), author)

      for author in uniqueAuthors.values()
        stdout.println(`${author.firstname()} ${author.surname()}`)

//EOF

The uniq processing has been achieved through the use of a Dictionary/Map, but it could have been done with a List.
Here is the output (note authors names are not sorted)

John Doe
Mark Pickford
James Pickford
Mark Keely
William Fence        
    

The code is somewhat shorter (albeit without any sorting of author names) and for one-off short simple processing maybe it is a better approach. But the minute you then need to make a few alterations here and there; the stream pipeline approach become much easier to alter change and manipulate.

Conclusion

Hopefully these stream pipelines will give a developer a set of flexible and standardised tools to process collections of Objects in a flexible and clear manner. This more functional approach has much to be said for it as it reduces the number of methods needed on classes and provides for much more reuse of code.

But in general a more functional approach take quite a bit more thought and sometimes more code than an imperative approach. But when reviewing the code the functional approach tends to just leap of the page at you; as to how it works.

But with EK9 you now have a choice in your development approach.

Next Steps

Advanced class methods is covered in the next section. This is not related to streaming pipeline but is related to Classes.