hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::tick::NoAtomic;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::{NonDet, nondet};
22use crate::staging_util::get_this_crate;
23
24// same as the one in `hydro_std`, but internal use only
25fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
26    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
27) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
28    membership
29        .fold(
30            q!(|| false),
31            q!(|present, event| {
32                match event {
33                    MembershipEvent::Joined => *present = true,
34                    MembershipEvent::Left => *present = false,
35                }
36            }),
37        )
38        .filter_map(q!(|v| if v { Some(()) } else { None }))
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42    let root = get_this_crate();
43
44    if is_demux {
45        parse_quote! {
46            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
47                |(id, data)| {
48                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
49                }
50            )
51        }
52    } else {
53        parse_quote! {
54            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55                |data| {
56                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
57                }
58            )
59        }
60    }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64    serialize_bincode_with_type(is_demux, &quote_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68    let root = get_this_crate();
69
70    if let Some(c_type) = tagged {
71        parse_quote! {
72            |res| {
73                let (id, b) = res.unwrap();
74                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75            }
76        }
77    } else {
78        parse_quote! {
79            |res| {
80                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81            }
82        }
83    }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87    deserialize_bincode_with_type(tagged, &quote_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92    /// using [`bincode`] to serialize/deserialize messages.
93    ///
94    /// The returned stream captures the elements received at the destination, where values will
95    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98    /// dropped no further messages will be sent.
99    ///
100    /// # Example
101    /// ```rust
102    /// # use hydro_lang::prelude::*;
103    /// # use futures::StreamExt;
104    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105    /// let p1 = flow.process::<()>();
106    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107    /// let p2 = flow.process::<()>();
108    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109    /// // 1, 2, 3
110    /// # on_p2.send_bincode(&p_out)
111    /// # }, |mut stream| async move {
112    /// # for w in 1..=3 {
113    /// #     assert_eq!(stream.next().await, Some(w));
114    /// # }
115    /// # }));
116    /// ```
117    pub fn send_bincode<L2>(
118        self,
119        other: &Process<'a, L2>,
120    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
121    where
122        T: Serialize + DeserializeOwned,
123    {
124        let serialize_pipeline = Some(serialize_bincode::<T>(false));
125
126        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
127
128        Stream::new(
129            other.clone(),
130            HydroNode::Network {
131                serialize_fn: serialize_pipeline.map(|e| e.into()),
132                instantiate_fn: DebugInstantiate::Building,
133                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
134                input: Box::new(self.ir_node.into_inner()),
135                metadata: other.new_node_metadata::<T>(),
136            },
137        )
138    }
139
140    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
141    /// using [`bincode`] to serialize/deserialize messages.
142    ///
143    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
144    /// membership information. This is a common pattern in distributed systems for broadcasting data to
145    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
146    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
147    /// each element to all cluster members.
148    ///
149    /// # Non-Determinism
150    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
151    /// to the current cluster members _at that point in time_. Depending on when we are notified of
152    /// membership changes, we will broadcast each element to different members.
153    ///
154    /// # Example
155    /// ```rust
156    /// # use hydro_lang::prelude::*;
157    /// # use futures::StreamExt;
158    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
159    /// let p1 = flow.process::<()>();
160    /// let workers: Cluster<()> = flow.cluster::<()>();
161    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
162    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
163    /// # on_worker.send_bincode(&p2).entries()
164    /// // if there are 4 members in the cluster, each receives one element
165    /// // - MemberId::<()>(0): [123]
166    /// // - MemberId::<()>(1): [123]
167    /// // - MemberId::<()>(2): [123]
168    /// // - MemberId::<()>(3): [123]
169    /// # }, |mut stream| async move {
170    /// # let mut results = Vec::new();
171    /// # for w in 0..4 {
172    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
173    /// # }
174    /// # results.sort();
175    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
176    /// # }));
177    /// ```
178    pub fn broadcast_bincode<L2: 'a>(
179        self,
180        other: &Cluster<'a, L2>,
181        nondet_membership: NonDet,
182    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
183    where
184        T: Clone + Serialize + DeserializeOwned,
185    {
186        let ids = track_membership(self.location.source_cluster_members(other));
187        let join_tick = self.location.tick();
188        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
189
190        current_members
191            .weaker_retries()
192            .assume_ordering::<TotalOrder>(
193                nondet!(/** we send to each member independently, order does not matter */),
194            )
195            .cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
196            .weaken_ordering::<O>()
197            .all_ticks()
198            .demux_bincode(other)
199    }
200
201    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
202    /// serialization. The external process can receive these elements by establishing a TCP
203    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
204    ///
205    /// # Example
206    /// ```rust
207    /// # use hydro_lang::prelude::*;
208    /// # use futures::StreamExt;
209    /// # tokio_test::block_on(async move {
210    /// let flow = FlowBuilder::new();
211    /// let process = flow.process::<()>();
212    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
213    /// let external = flow.external::<()>();
214    /// let external_handle = numbers.send_bincode_external(&external);
215    ///
216    /// let mut deployment = hydro_deploy::Deployment::new();
217    /// let nodes = flow
218    ///     .with_process(&process, deployment.Localhost())
219    ///     .with_external(&external, deployment.Localhost())
220    ///     .deploy(&mut deployment);
221    ///
222    /// deployment.deploy().await.unwrap();
223    /// // establish the TCP connection
224    /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
225    /// deployment.start().await.unwrap();
226    ///
227    /// for w in 1..=3 {
228    ///     assert_eq!(external_recv_stream.next().await, Some(w));
229    /// }
230    /// # });
231    /// ```
232    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
233    where
234        T: Serialize + DeserializeOwned,
235    {
236        let serialize_pipeline = Some(serialize_bincode::<T>(false));
237
238        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
239
240        let external_key = flow_state_borrow.next_external_out;
241        flow_state_borrow.next_external_out += 1;
242
243        flow_state_borrow.push_root(HydroRoot::SendExternal {
244            to_external_id: other.id,
245            to_key: external_key,
246            to_many: false,
247            serialize_fn: serialize_pipeline.map(|e| e.into()),
248            instantiate_fn: DebugInstantiate::Building,
249            input: Box::new(HydroNode::Unpersist {
250                inner: Box::new(self.ir_node.into_inner()),
251                metadata: self.location.new_node_metadata::<T>(),
252            }),
253            op_metadata: HydroIrOpMetadata::new(),
254        });
255
256        ExternalBincodeStream {
257            process_id: other.id,
258            port_id: external_key,
259            _phantom: PhantomData,
260        }
261    }
262}
263
264impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
265    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
266{
267    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
268    /// using [`bincode`] to serialize/deserialize messages.
269    ///
270    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
271    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
272    /// this API allows precise targeting of specific cluster members rather than broadcasting to
273    /// all members.
274    ///
275    /// # Example
276    /// ```rust
277    /// # use hydro_lang::prelude::*;
278    /// # use futures::StreamExt;
279    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
280    /// let p1 = flow.process::<()>();
281    /// let workers: Cluster<()> = flow.cluster::<()>();
282    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
283    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
284    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
285    ///     .demux_bincode(&workers);
286    /// # on_worker.send_bincode(&p2).entries()
287    /// // if there are 4 members in the cluster, each receives one element
288    /// // - MemberId::<()>(0): [0]
289    /// // - MemberId::<()>(1): [1]
290    /// // - MemberId::<()>(2): [2]
291    /// // - MemberId::<()>(3): [3]
292    /// # }, |mut stream| async move {
293    /// # let mut results = Vec::new();
294    /// # for w in 0..4 {
295    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
296    /// # }
297    /// # results.sort();
298    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
299    /// # }));
300    /// ```
301    pub fn demux_bincode(
302        self,
303        other: &Cluster<'a, L2>,
304    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
305    where
306        T: Serialize + DeserializeOwned,
307    {
308        self.into_keyed().demux_bincode(other)
309    }
310}
311
312impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
313    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
314    /// [`bincode`] to serialize/deserialize messages.
315    ///
316    /// This provides load balancing by evenly distributing work across cluster members. The
317    /// distribution is deterministic based on element order - the first element goes to member 0,
318    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
319    ///
320    /// # Non-Determinism
321    /// The set of cluster members may asynchronously change over time. Each element is distributed
322    /// based on the current cluster membership _at that point in time_. Depending on when cluster
323    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
324    /// membership is stable, the order of members in the round-robin pattern may change across runs.
325    ///
326    /// # Ordering Requirements
327    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
328    /// order of messages and retries affects the round-robin pattern.
329    ///
330    /// # Example
331    /// ```rust
332    /// # use hydro_lang::prelude::*;
333    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
334    /// # use futures::StreamExt;
335    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
336    /// let p1 = flow.process::<()>();
337    /// let workers: Cluster<()> = flow.cluster::<()>();
338    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
339    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
340    /// on_worker.send_bincode(&p2)
341    /// # .first().values() // we use first to assert that each member gets one element
342    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
343    /// // - MemberId::<()>(?): [1]
344    /// // - MemberId::<()>(?): [2]
345    /// // - MemberId::<()>(?): [3]
346    /// // - MemberId::<()>(?): [4]
347    /// # }, |mut stream| async move {
348    /// # let mut results = Vec::new();
349    /// # for w in 0..4 {
350    /// #     results.push(stream.next().await.unwrap());
351    /// # }
352    /// # results.sort();
353    /// # assert_eq!(results, vec![1, 2, 3, 4]);
354    /// # }));
355    /// ```
356    pub fn round_robin_bincode<L2: 'a>(
357        self,
358        other: &Cluster<'a, L2>,
359        nondet_membership: NonDet,
360    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
361    where
362        T: Serialize + DeserializeOwned,
363    {
364        let ids = track_membership(self.location.source_cluster_members(other));
365        let join_tick = self.location.tick();
366        let current_members = ids
367            .snapshot(&join_tick, nondet_membership)
368            .keys()
369            .assume_ordering(nondet_membership)
370            .collect_vec();
371
372        self.enumerate()
373            .batch(&join_tick, nondet_membership)
374            .cross_singleton(current_members)
375            .map(q!(|(data, members)| (
376                members[data.0 % members.len()],
377                data.1
378            )))
379            .all_ticks()
380            .demux_bincode(other)
381    }
382}
383
384impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
385    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
386    /// using [`bincode`] to serialize/deserialize messages.
387    ///
388    /// Each cluster member sends its local stream elements, and they are collected at the destination
389    /// as a [`KeyedStream`] where keys identify the source cluster member.
390    ///
391    /// # Example
392    /// ```rust
393    /// # use hydro_lang::prelude::*;
394    /// # use futures::StreamExt;
395    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
396    /// let workers: Cluster<()> = flow.cluster::<()>();
397    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
398    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
399    /// # all_received.entries()
400    /// # }, |mut stream| async move {
401    /// // if there are 4 members in the cluster, we should receive 4 elements
402    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
403    /// # let mut results = Vec::new();
404    /// # for w in 0..4 {
405    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
406    /// # }
407    /// # results.sort();
408    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
409    /// # }));
410    /// ```
411    ///
412    /// If you don't need to know the source for each element, you can use `.values()`
413    /// to get just the data:
414    /// ```rust
415    /// # use hydro_lang::prelude::*;
416    /// # use hydro_lang::live_collections::stream::NoOrder;
417    /// # use futures::StreamExt;
418    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
419    /// # let workers: Cluster<()> = flow.cluster::<()>();
420    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
421    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
422    /// # values
423    /// # }, |mut stream| async move {
424    /// # let mut results = Vec::new();
425    /// # for w in 0..4 {
426    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
427    /// # }
428    /// # results.sort();
429    /// // if there are 4 members in the cluster, we should receive 4 elements
430    /// // 1, 1, 1, 1
431    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
432    /// # }));
433    /// ```
434    pub fn send_bincode<L2>(
435        self,
436        other: &Process<'a, L2>,
437    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
438    where
439        T: Serialize + DeserializeOwned,
440    {
441        let serialize_pipeline = Some(serialize_bincode::<T>(false));
442
443        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
444
445        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
446            other.clone(),
447            HydroNode::Network {
448                serialize_fn: serialize_pipeline.map(|e| e.into()),
449                instantiate_fn: DebugInstantiate::Building,
450                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
451                input: Box::new(self.ir_node.into_inner()),
452                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
453            },
454        );
455
456        raw_stream.into_keyed()
457    }
458
459    /// Broadcasts elements of this stream at each source member to all members of a destination
460    /// cluster, using [`bincode`] to serialize/deserialize messages.
461    ///
462    /// Each source member sends each of its stream elements to **every** member of the cluster
463    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
464    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
465    /// **only data elements** and sends each element to all cluster members.
466    ///
467    /// # Non-Determinism
468    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
469    /// to the current cluster members known _at that point in time_ at the source member. Depending
470    /// on when each source member is notified of membership changes, it will broadcast each element
471    /// to different members.
472    ///
473    /// # Example
474    /// ```rust
475    /// # use hydro_lang::prelude::*;
476    /// # use hydro_lang::location::MemberId;
477    /// # use futures::StreamExt;
478    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
479    /// # type Source = ();
480    /// # type Destination = ();
481    /// let source: Cluster<Source> = flow.cluster::<Source>();
482    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
483    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
484    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
485    /// # on_destination.entries().send_bincode(&p2).entries()
486    /// // if there are 4 members in the desination, each receives one element from each source member
487    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
488    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
489    /// // - ...
490    /// # }, |mut stream| async move {
491    /// # let mut results = Vec::new();
492    /// # for w in 0..16 {
493    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
494    /// # }
495    /// # results.sort();
496    /// # assert_eq!(results, vec![
497    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
498    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
499    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
500    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
501    /// # ]);
502    /// # }));
503    /// ```
504    pub fn broadcast_bincode<L2: 'a>(
505        self,
506        other: &Cluster<'a, L2>,
507        nondet_membership: NonDet,
508    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
509    where
510        T: Clone + Serialize + DeserializeOwned,
511    {
512        let ids = track_membership(self.location.source_cluster_members(other));
513        let join_tick = self.location.tick();
514        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
515
516        current_members
517            .weaker_retries()
518            .assume_ordering::<TotalOrder>(
519                nondet!(/** we send to each member independently, order does not matter */),
520            )
521            .cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
522            .all_ticks()
523            .demux_bincode(other)
524    }
525}
526
527impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
528    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
529{
530    /// Sends elements of this stream at each source member to specific members of a destination
531    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
532    ///
533    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
534    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
535    /// this API allows precise targeting of specific cluster members rather than broadcasting to
536    /// all members.
537    ///
538    /// Each cluster member sends its local stream elements, and they are collected at each
539    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
540    ///
541    /// # Example
542    /// ```rust
543    /// # use hydro_lang::prelude::*;
544    /// # use futures::StreamExt;
545    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
546    /// # type Source = ();
547    /// # type Destination = ();
548    /// let source: Cluster<Source> = flow.cluster::<Source>();
549    /// let to_send: Stream<_, Cluster<_>, _> = source
550    ///     .source_iter(q!(vec![0, 1, 2, 3]))
551    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
552    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
553    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
554    /// # all_received.entries().send_bincode(&p2).entries()
555    /// # }, |mut stream| async move {
556    /// // if there are 4 members in the destination cluster, each receives one message from each source member
557    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
558    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
559    /// // - ...
560    /// # let mut results = Vec::new();
561    /// # for w in 0..16 {
562    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
563    /// # }
564    /// # results.sort();
565    /// # assert_eq!(results, vec![
566    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
567    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
568    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
569    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
570    /// # ]);
571    /// # }));
572    /// ```
573    pub fn demux_bincode(
574        self,
575        other: &Cluster<'a, L2>,
576    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
577    where
578        T: Serialize + DeserializeOwned,
579    {
580        self.into_keyed().demux_bincode(other)
581    }
582}