Ray: Ray Streaming implementation plan

Created on 18 Nov 2019  路  15Comments  路  Source: ray-project/ray

This issue documents our Streaming plan as we have discussed, and can be used to track our progress later.

Currently, there're two streaming api prototypes implemented separately in ray. One is written with Java, the other is written with Python. However, both of them are not good enough, and lack a common runtime implementation with efficient data transfer, fault tolerance, scheduling, state, etc, which are commonly required by real industry applications. In Ant Financial, we have built a streaming system based on Ray Core and have put it in production, and we would like to contribute it to community. Here is the design doc. We plan to open source it with following steps, each step may require 1 or more pull requests:

Data transfer: This will include the common code to send streaming data to downstream efficiently with direct actor call, and it will also support data buffering and back pressure. The core code will be implemented with c++, we will then integrate it with current java and python streaming api so that both java and python can run on top of same c++ implementation.

API improvement: We will improve current python/java apis, making them full functional and aligned.

Scheduling: There will be a Master Actor to accept job submission and schedule all other java/python Worker Actors with specified strategies, to fully utilize the Ray cluster鈥檚 resources while keeping the job running efficiently and healthily.

Fault tolerance: This will be the implementation of our checkpoint and failover mechanism described in design doc, including changes to both streaming Master and streaming Worker, and also the data transfer.

State: This will contain the streaming State abstraction and implementation for streaming api, as well as its backend implementation to support large State with fault tolerance.

Monitoring: This will implement some restful apis and probably a web ui for users to monitor their streaming jobs easily at any time, the web ui can be based on current ray web ui.

Cross-lang job: This will contain cross-lang implementation to support a job consisting of both java and python workers, which will be required by some applications like online learning.

Others: There may be some other smaller code changes not realized so far, will be added if realized.

Most helpful comment

I see. Most of Flink's DataStream API should be supported finally, but for now, we do lack much of them. We will prepare a document for the APIs we plan to support and their current status.

All 15 comments

Looking over the implementation plan, it is not clear what the end state of the Python API will be. Is this something that has been investigated in detail yet?

Hi Eric,

For Python, in the final state, we want a Python API like Flink(which should also be consistent with our Java API), with some extension to support schema and cross-lang, as we need to support cross-lang streaming jobs.

To achieve that, we will replace current Python API implementation, with following plan in more detail:

  1. rewrite current python api, making it consistent with java.
  2. map python streaming graph to java streaming graph, so that it can be submitted to a java JobMaster for scheduling.
  3. implement python JobWorker, which should have same functions as java JobWorker, and be able to communicate with java JobMaster and run python user function.
  4. improve continuously together with Java API, by adding more functions like event time, state and so on, making it fully functional like Flink.
  5. add some extension to support cross-lang streaming api, graph and schema, so that we can run a job consisting of both java and python workers. Here schema can be used as data format when data exchanges between java/python workers.

1,2,3 can be done in the API improvement and Scheduling part, 4 can be probably done together with State part, 5 can be done in Cross-lang job part.

Thanks @liangyi123 , this sounds pretty good.

One thing that would be great to put together even early on is documentation is how exactly Ray streaming is to differ from Flink, both API-wise and behaviorally. I can see users being quite confused at whether they should be using the Datastream documentation or not, when they should choose Ray Streams API vs Flink, etc.

A couple relevant points I can think of right off are:

  • native Python API (vs Jython for Flink)
  • performance
  • interop with Ray actors and tasks

Thanks @ericl !

Totally agree what you have mentioned.

Here are what we have summarized in design doc before, compared with other streaming system like Flink,

  • It can work seamlessly with other frameworks, e.g. tensorflow, as long as it can run on Ray.
  • It has good real-time performance in case of both normal execution and exception handling(e.g. failure recovery).
  • It keeps strongly consistent in case of any failure or change.
  • It has native support for both Java and Python APIs.

What about API compatibility (i.e., what is supported of the Datastream API, any differences, or new APIs)?

A table summarizing all the parts of the DataStream API and their Ray status would be ideal. For example, operators, sinks, sources, options, debugging: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html

I see. Most of Flink's DataStream API should be supported finally, but for now, we do lack much of them. We will prepare a document for the APIs we plan to support and their current status.

@ericl This is the Ray Streaming API and their current status https://docs.google.com/document/d/1bwTAtxszssTwuT2AJl7z2AhaUzO2bui645jSAhebFCA/

hi, all this is State Management for Ray Streaming Proposal https://docs.google.com/document/d/1wGF3GgrxN4yc6i9wHFG9d4pFB75XBHLNMEu4rzW4sLg/edit#

hi, all this is State Management for Ray Streaming Proposal https://docs.google.com/document/d/1wGF3GgrxN4yc6i9wHFG9d4pFB75XBHLNMEu4rzW4sLg/edit#

Could you please share a comment-able version?

hi, all this is State Management for Ray Streaming Proposal https://docs.google.com/document/d/1wGF3GgrxN4yc6i9wHFG9d4pFB75XBHLNMEu4rzW4sLg/edit#

Could you please share a comment-able version?

You can give some comments in this PR : https://github.com/ray-project/ray/pull/7348.

Cool stuff here - definitely happy to see a plan for Flink-like streaming support in Ray!

Out of curiosity, have we considered implementing an Apache Beam Runner on top of Ray Streaming as part of this plan? At a high level, I see it providing the following benefits:

  1. Exposes a unified, cross-language (Java, Python, Scala, Beam SQL, etc.) API to clients for writing streaming/batch data processing pipelines running on Ray.
  2. Clarify exactly what common capabilities are supported by Ray Streaming versus other platforms (including Flink). See https://beam.apache.org/documentation/runners/capability-matrix/ for an example of the high-level capability types that writing a Beam Pipeline Runner would force its implementor to think about (it looks like we've already done a similar exercise in the Ray Streaming API doc at https://docs.google.com/document/d/1bwTAtxszssTwuT2AJl7z2AhaUzO2bui645jSAhebFCA/edit).
  3. Allow existing Beam users to port their existing pipelines over to the Ray execution platform without having to completely rewrite their data processing pipeline code. This in turn facilitates more users naturally experimenting with Ray Streaming, testing its capabilities, discovering which use cases could use improvement, and which use cases it is best suited to handle.
  4. Inherit support for all sources/sinks currently provided as Beam IO implementations (see https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/).

If this is something that we're interested in pursuing, then I'd be happy to start contributing associated design docs, tasks, and code.

Additional Information:

  1. How Beam Runs on Top of Flink: https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
  2. Why Apache Beam? A Google Perspective: https://cloud.google.com/blog/products/gcp/why-apache-beam-a-google-perspective
  3. Apache Beam vs. Direct Spark/Flink API: https://stackoverflow.com/questions/43581127/what-are-the-benefits-of-apache-beam-over-spark-flink-for-batch-processing/43672838#43672838
  4. Implementing Beam Pipeline Runners: https://beam.apache.org/contribute/runner-guide/
  5. Streaming Systems (book largely explaining the underlying streaming model that Beam serves as a reference implementation of): http://shop.oreilly.com/product/0636920073994.do

Thanks @pdames. That's really a very good advice. We didn't put it in the initial plan because we were focusing on basic streaming functionalities at that time. Actually even now, as you can see, there're still some functionalities not implemented yet. But still I think this is a good idea, and it will be great if you can join us to work on it. If interested, at any time, you can follow the API progress (refer to code and this doc: https://docs.google.com/document/d/1bwTAtxszssTwuT2AJl7z2AhaUzO2bui645jSAhebFCA/edit) and start the related work. Thanks!

Hey guys, Ray Streaming Fault Tolerance Impl is also coming, here is the proposal: https://docs.google.com/document/d/1NKjGr7fi-45cEzWA-N_wJ5CoUgaJfnsW9YeWsSg1shY

The PR is preparing and will be opened soon, you could go through the proposal to have a general idea of what will be built. Comments and suggestions are welcome.

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

pls keep this issue open for future improvement

Was this page helpful?
0 / 5 - 0 ratings