RxJS update Observable array items on the fly

Im using GeoFire in an Ionic 2/Firebase APP. The main screen function gets all the users that are close by as follows, This gets fed to an ngFor loop as an array.

The issue: If one of the users details change, for instance if they change their name, then a duplicate user gets added to the array rather than the actual user getting updated. Im not sure how to work this into this function.

public outputUsersCloseToMe() : Observable<Array<displayAllUsers>> {

    const currUsr = this.firebaseInit().auth().currentUser.uid

    let availableUsers = <Array<displayAllUsers>>[];

    const loc = this.firebaseInit().database().ref("users")
                                .child(currUsr)
                                .child('profile')
                                .child('lastKnownLocation');

    loc.once('value').then( (snapshot) => {
        let geoQuery = this.geoFire.query({
            center: [snapshot.val().lat, snapshot.val().lng],
            radius: 12.0//kilometers
        });
        geoQuery.on("key_entered", (key, location, distance) => {
            this.firebaseInit().database().ref('users').child(key).child('profile').on('value', (userprofile) => {  
                if( userprofile.val() ) {
                    availableUsers.push({ 
                        id: userprofile.val().id,                        
                        gender: userprofile.val().gender,
                        distance: distance.toFixed(2),
                        location: location,
                        firstName: userprofile.val().firstName,                        
                    });        
                }
            }); 
        });
    });
    return Observable.from([availableUsers]).map(val => val); 
}

and the template is similar to the following

        
{{ usr?.id }}


Source: stackoverflow-javascript

RxJS Observable with asynchronous subscriber function

I’m trying to do something that feels like it should be straightforward, but is proving surprisingly difficult.

I have a function to subscribe to a RabbitMQ queue. Concretely, this is the Channel.consume function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

It returns a promise which is resolved with a subscription id – which is needed to unsubscribe later – and also has a callback argument to invoke when messages are pulled off the queue.

When I want to unsubscribe from the queue, I’d need to cancel the consumer using the Channel.cancel function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. This takes the previously returned subscription id.

I want to wrap all of this stuff in an Observable that subscribes to the queue when the observable is subscribed to, and cancels the subscription when the observable is unsubscribed from. However, this is proving somewhat hard due to the ‘double-asynchronous’ nature of the calls (I mean to say that they have both a callback AND return a promise).

Ideally, the code I’d like to be able to write is:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

However, this isn’t possible as this constructor doesn’t support async subscriber functions or teardown logic.

I’ve not been able to figure this one out. Am I missing something here? Why is this so hard?

Cheers,
Alex


Source: stackoverflow-javascript