hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::tick::NoAtomic;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::{NonDet, nondet};
22use crate::staging_util::get_this_crate;
23
24// same as the one in `hydro_std`, but internal use only
25fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
26 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
27) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
28 membership
29 .fold(
30 q!(|| false),
31 q!(|present, event| {
32 match event {
33 MembershipEvent::Joined => *present = true,
34 MembershipEvent::Left => *present = false,
35 }
36 }),
37 )
38 .filter_map(q!(|v| if v { Some(()) } else { None }))
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42 let root = get_this_crate();
43
44 if is_demux {
45 parse_quote! {
46 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
47 |(id, data)| {
48 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
49 }
50 )
51 }
52 } else {
53 parse_quote! {
54 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55 |data| {
56 #root::runtime_support::bincode::serialize(&data).unwrap().into()
57 }
58 )
59 }
60 }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64 serialize_bincode_with_type(is_demux, "e_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68 let root = get_this_crate();
69
70 if let Some(c_type) = tagged {
71 parse_quote! {
72 |res| {
73 let (id, b) = res.unwrap();
74 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75 }
76 }
77 } else {
78 parse_quote! {
79 |res| {
80 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81 }
82 }
83 }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87 deserialize_bincode_with_type(tagged, "e_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92 /// using [`bincode`] to serialize/deserialize messages.
93 ///
94 /// The returned stream captures the elements received at the destination, where values will
95 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98 /// dropped no further messages will be sent.
99 ///
100 /// # Example
101 /// ```rust
102 /// # use hydro_lang::prelude::*;
103 /// # use futures::StreamExt;
104 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105 /// let p1 = flow.process::<()>();
106 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107 /// let p2 = flow.process::<()>();
108 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109 /// // 1, 2, 3
110 /// # on_p2.send_bincode(&p_out)
111 /// # }, |mut stream| async move {
112 /// # for w in 1..=3 {
113 /// # assert_eq!(stream.next().await, Some(w));
114 /// # }
115 /// # }));
116 /// ```
117 pub fn send_bincode<L2>(
118 self,
119 other: &Process<'a, L2>,
120 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
121 where
122 T: Serialize + DeserializeOwned,
123 {
124 let serialize_pipeline = Some(serialize_bincode::<T>(false));
125
126 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
127
128 Stream::new(
129 other.clone(),
130 HydroNode::Network {
131 serialize_fn: serialize_pipeline.map(|e| e.into()),
132 instantiate_fn: DebugInstantiate::Building,
133 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
134 input: Box::new(self.ir_node.into_inner()),
135 metadata: other.new_node_metadata::<T>(),
136 },
137 )
138 }
139
140 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
141 /// using [`bincode`] to serialize/deserialize messages.
142 ///
143 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
144 /// membership information. This is a common pattern in distributed systems for broadcasting data to
145 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
146 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
147 /// each element to all cluster members.
148 ///
149 /// # Non-Determinism
150 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
151 /// to the current cluster members _at that point in time_. Depending on when we are notified of
152 /// membership changes, we will broadcast each element to different members.
153 ///
154 /// # Example
155 /// ```rust
156 /// # use hydro_lang::prelude::*;
157 /// # use futures::StreamExt;
158 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
159 /// let p1 = flow.process::<()>();
160 /// let workers: Cluster<()> = flow.cluster::<()>();
161 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
162 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
163 /// # on_worker.send_bincode(&p2).entries()
164 /// // if there are 4 members in the cluster, each receives one element
165 /// // - MemberId::<()>(0): [123]
166 /// // - MemberId::<()>(1): [123]
167 /// // - MemberId::<()>(2): [123]
168 /// // - MemberId::<()>(3): [123]
169 /// # }, |mut stream| async move {
170 /// # let mut results = Vec::new();
171 /// # for w in 0..4 {
172 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
173 /// # }
174 /// # results.sort();
175 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
176 /// # }));
177 /// ```
178 pub fn broadcast_bincode<L2: 'a>(
179 self,
180 other: &Cluster<'a, L2>,
181 nondet_membership: NonDet,
182 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
183 where
184 T: Clone + Serialize + DeserializeOwned,
185 {
186 let ids = track_membership(self.location.source_cluster_members(other));
187 let join_tick = self.location.tick();
188 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
189
190 current_members
191 .weaker_retries()
192 .assume_ordering::<TotalOrder>(
193 nondet!(/** we send to each member independently, order does not matter */),
194 )
195 .cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
196 .weaken_ordering::<O>()
197 .all_ticks()
198 .demux_bincode(other)
199 }
200
201 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
202 /// serialization. The external process can receive these elements by establishing a TCP
203 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
204 ///
205 /// # Example
206 /// ```rust
207 /// # use hydro_lang::prelude::*;
208 /// # use futures::StreamExt;
209 /// # tokio_test::block_on(async move {
210 /// let flow = FlowBuilder::new();
211 /// let process = flow.process::<()>();
212 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
213 /// let external = flow.external::<()>();
214 /// let external_handle = numbers.send_bincode_external(&external);
215 ///
216 /// let mut deployment = hydro_deploy::Deployment::new();
217 /// let nodes = flow
218 /// .with_process(&process, deployment.Localhost())
219 /// .with_external(&external, deployment.Localhost())
220 /// .deploy(&mut deployment);
221 ///
222 /// deployment.deploy().await.unwrap();
223 /// // establish the TCP connection
224 /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
225 /// deployment.start().await.unwrap();
226 ///
227 /// for w in 1..=3 {
228 /// assert_eq!(external_recv_stream.next().await, Some(w));
229 /// }
230 /// # });
231 /// ```
232 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
233 where
234 T: Serialize + DeserializeOwned,
235 {
236 let serialize_pipeline = Some(serialize_bincode::<T>(false));
237
238 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
239
240 let external_key = flow_state_borrow.next_external_out;
241 flow_state_borrow.next_external_out += 1;
242
243 flow_state_borrow.push_root(HydroRoot::SendExternal {
244 to_external_id: other.id,
245 to_key: external_key,
246 to_many: false,
247 serialize_fn: serialize_pipeline.map(|e| e.into()),
248 instantiate_fn: DebugInstantiate::Building,
249 input: Box::new(HydroNode::Unpersist {
250 inner: Box::new(self.ir_node.into_inner()),
251 metadata: self.location.new_node_metadata::<T>(),
252 }),
253 op_metadata: HydroIrOpMetadata::new(),
254 });
255
256 ExternalBincodeStream {
257 process_id: other.id,
258 port_id: external_key,
259 _phantom: PhantomData,
260 }
261 }
262}
263
264impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
265 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
266{
267 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
268 /// using [`bincode`] to serialize/deserialize messages.
269 ///
270 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
271 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
272 /// this API allows precise targeting of specific cluster members rather than broadcasting to
273 /// all members.
274 ///
275 /// # Example
276 /// ```rust
277 /// # use hydro_lang::prelude::*;
278 /// # use futures::StreamExt;
279 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
280 /// let p1 = flow.process::<()>();
281 /// let workers: Cluster<()> = flow.cluster::<()>();
282 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
283 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
284 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
285 /// .demux_bincode(&workers);
286 /// # on_worker.send_bincode(&p2).entries()
287 /// // if there are 4 members in the cluster, each receives one element
288 /// // - MemberId::<()>(0): [0]
289 /// // - MemberId::<()>(1): [1]
290 /// // - MemberId::<()>(2): [2]
291 /// // - MemberId::<()>(3): [3]
292 /// # }, |mut stream| async move {
293 /// # let mut results = Vec::new();
294 /// # for w in 0..4 {
295 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
296 /// # }
297 /// # results.sort();
298 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
299 /// # }));
300 /// ```
301 pub fn demux_bincode(
302 self,
303 other: &Cluster<'a, L2>,
304 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
305 where
306 T: Serialize + DeserializeOwned,
307 {
308 self.into_keyed().demux_bincode(other)
309 }
310}
311
312impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
313 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
314 /// [`bincode`] to serialize/deserialize messages.
315 ///
316 /// This provides load balancing by evenly distributing work across cluster members. The
317 /// distribution is deterministic based on element order - the first element goes to member 0,
318 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
319 ///
320 /// # Non-Determinism
321 /// The set of cluster members may asynchronously change over time. Each element is distributed
322 /// based on the current cluster membership _at that point in time_. Depending on when cluster
323 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
324 /// membership is stable, the order of members in the round-robin pattern may change across runs.
325 ///
326 /// # Ordering Requirements
327 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
328 /// order of messages and retries affects the round-robin pattern.
329 ///
330 /// # Example
331 /// ```rust
332 /// # use hydro_lang::prelude::*;
333 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
334 /// # use futures::StreamExt;
335 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
336 /// let p1 = flow.process::<()>();
337 /// let workers: Cluster<()> = flow.cluster::<()>();
338 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
339 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
340 /// on_worker.send_bincode(&p2)
341 /// # .first().values() // we use first to assert that each member gets one element
342 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
343 /// // - MemberId::<()>(?): [1]
344 /// // - MemberId::<()>(?): [2]
345 /// // - MemberId::<()>(?): [3]
346 /// // - MemberId::<()>(?): [4]
347 /// # }, |mut stream| async move {
348 /// # let mut results = Vec::new();
349 /// # for w in 0..4 {
350 /// # results.push(stream.next().await.unwrap());
351 /// # }
352 /// # results.sort();
353 /// # assert_eq!(results, vec![1, 2, 3, 4]);
354 /// # }));
355 /// ```
356 pub fn round_robin_bincode<L2: 'a>(
357 self,
358 other: &Cluster<'a, L2>,
359 nondet_membership: NonDet,
360 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
361 where
362 T: Serialize + DeserializeOwned,
363 {
364 let ids = track_membership(self.location.source_cluster_members(other));
365 let join_tick = self.location.tick();
366 let current_members = ids
367 .snapshot(&join_tick, nondet_membership)
368 .keys()
369 .assume_ordering(nondet_membership)
370 .collect_vec();
371
372 self.enumerate()
373 .batch(&join_tick, nondet_membership)
374 .cross_singleton(current_members)
375 .map(q!(|(data, members)| (
376 members[data.0 % members.len()],
377 data.1
378 )))
379 .all_ticks()
380 .demux_bincode(other)
381 }
382}
383
384impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
385 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
386 /// using [`bincode`] to serialize/deserialize messages.
387 ///
388 /// Each cluster member sends its local stream elements, and they are collected at the destination
389 /// as a [`KeyedStream`] where keys identify the source cluster member.
390 ///
391 /// # Example
392 /// ```rust
393 /// # use hydro_lang::prelude::*;
394 /// # use futures::StreamExt;
395 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
396 /// let workers: Cluster<()> = flow.cluster::<()>();
397 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
398 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
399 /// # all_received.entries()
400 /// # }, |mut stream| async move {
401 /// // if there are 4 members in the cluster, we should receive 4 elements
402 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
403 /// # let mut results = Vec::new();
404 /// # for w in 0..4 {
405 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
406 /// # }
407 /// # results.sort();
408 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
409 /// # }));
410 /// ```
411 ///
412 /// If you don't need to know the source for each element, you can use `.values()`
413 /// to get just the data:
414 /// ```rust
415 /// # use hydro_lang::prelude::*;
416 /// # use hydro_lang::live_collections::stream::NoOrder;
417 /// # use futures::StreamExt;
418 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
419 /// # let workers: Cluster<()> = flow.cluster::<()>();
420 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
421 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
422 /// # values
423 /// # }, |mut stream| async move {
424 /// # let mut results = Vec::new();
425 /// # for w in 0..4 {
426 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
427 /// # }
428 /// # results.sort();
429 /// // if there are 4 members in the cluster, we should receive 4 elements
430 /// // 1, 1, 1, 1
431 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
432 /// # }));
433 /// ```
434 pub fn send_bincode<L2>(
435 self,
436 other: &Process<'a, L2>,
437 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
438 where
439 T: Serialize + DeserializeOwned,
440 {
441 let serialize_pipeline = Some(serialize_bincode::<T>(false));
442
443 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
444
445 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
446 other.clone(),
447 HydroNode::Network {
448 serialize_fn: serialize_pipeline.map(|e| e.into()),
449 instantiate_fn: DebugInstantiate::Building,
450 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
451 input: Box::new(self.ir_node.into_inner()),
452 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
453 },
454 );
455
456 raw_stream.into_keyed()
457 }
458
459 /// Broadcasts elements of this stream at each source member to all members of a destination
460 /// cluster, using [`bincode`] to serialize/deserialize messages.
461 ///
462 /// Each source member sends each of its stream elements to **every** member of the cluster
463 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
464 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
465 /// **only data elements** and sends each element to all cluster members.
466 ///
467 /// # Non-Determinism
468 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
469 /// to the current cluster members known _at that point in time_ at the source member. Depending
470 /// on when each source member is notified of membership changes, it will broadcast each element
471 /// to different members.
472 ///
473 /// # Example
474 /// ```rust
475 /// # use hydro_lang::prelude::*;
476 /// # use hydro_lang::location::MemberId;
477 /// # use futures::StreamExt;
478 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
479 /// # type Source = ();
480 /// # type Destination = ();
481 /// let source: Cluster<Source> = flow.cluster::<Source>();
482 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
483 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
484 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
485 /// # on_destination.entries().send_bincode(&p2).entries()
486 /// // if there are 4 members in the desination, each receives one element from each source member
487 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
488 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
489 /// // - ...
490 /// # }, |mut stream| async move {
491 /// # let mut results = Vec::new();
492 /// # for w in 0..16 {
493 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
494 /// # }
495 /// # results.sort();
496 /// # assert_eq!(results, vec![
497 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
498 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
499 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
500 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
501 /// # ]);
502 /// # }));
503 /// ```
504 pub fn broadcast_bincode<L2: 'a>(
505 self,
506 other: &Cluster<'a, L2>,
507 nondet_membership: NonDet,
508 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
509 where
510 T: Clone + Serialize + DeserializeOwned,
511 {
512 let ids = track_membership(self.location.source_cluster_members(other));
513 let join_tick = self.location.tick();
514 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
515
516 current_members
517 .weaker_retries()
518 .assume_ordering::<TotalOrder>(
519 nondet!(/** we send to each member independently, order does not matter */),
520 )
521 .cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
522 .all_ticks()
523 .demux_bincode(other)
524 }
525}
526
527impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
528 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
529{
530 /// Sends elements of this stream at each source member to specific members of a destination
531 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
532 ///
533 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
534 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
535 /// this API allows precise targeting of specific cluster members rather than broadcasting to
536 /// all members.
537 ///
538 /// Each cluster member sends its local stream elements, and they are collected at each
539 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
540 ///
541 /// # Example
542 /// ```rust
543 /// # use hydro_lang::prelude::*;
544 /// # use futures::StreamExt;
545 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
546 /// # type Source = ();
547 /// # type Destination = ();
548 /// let source: Cluster<Source> = flow.cluster::<Source>();
549 /// let to_send: Stream<_, Cluster<_>, _> = source
550 /// .source_iter(q!(vec![0, 1, 2, 3]))
551 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
552 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
553 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
554 /// # all_received.entries().send_bincode(&p2).entries()
555 /// # }, |mut stream| async move {
556 /// // if there are 4 members in the destination cluster, each receives one message from each source member
557 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
558 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
559 /// // - ...
560 /// # let mut results = Vec::new();
561 /// # for w in 0..16 {
562 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
563 /// # }
564 /// # results.sort();
565 /// # assert_eq!(results, vec![
566 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
567 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
568 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
569 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
570 /// # ]);
571 /// # }));
572 /// ```
573 pub fn demux_bincode(
574 self,
575 other: &Cluster<'a, L2>,
576 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
577 where
578 T: Serialize + DeserializeOwned,
579 {
580 self.into_keyed().demux_bincode(other)
581 }
582}