Liu Song’s Projects


~/Projects/flow

git clone https://code.lsong.org/flow

Commit

Commit
5b24e8b69c839bc31f2d14227a7fc879ca9675b1
Author
Nick O'Leary <[email protected]>
Date
2021-07-13 11:37:13 +0100 +0100
Diffstat
 packages/node_modules/@node-red/nodes/core/function/89-delay.html | 11 
 packages/node_modules/@node-red/nodes/core/function/89-delay.js | 134 
 packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html | 3 
 test/nodes/core/function/89-delay_spec.js | 101 

Merge pull request #3059 from node-red/delay-queue

Delay node updates to added numeric flush control for releasing things from queue


diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.html b/packages/node_modules/@node-red/nodes/core/function/89-delay.html
index 97cf05e396e85c315fcb4584e49565f50bbd1756..cd8fef3602f4b8f2b1a1bfe6e9dfbb752fe12519 100644
--- a/packages/node_modules/@node-red/nodes/core/function/89-delay.html
+++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.html
@@ -141,17 +141,6 @@                 } else {
                     return this._("delay.label.limitTopic")+" "+rate;
                 }
             }
-            //     var units = '';
-            //     if (this.nbRateUnits > 1) {
-            //         units = this.nbRateUnits + ' ' + this._("delay.label.units." + this.rateUnits + ".plural");
-            //     } else {
-            //         units = this._("delay.label.units." + this.rateUnits + ".singular");
-            //     }
-            //     return this.name || this.rate + " " + this._("delay.label.timed") + ' ' + units;
-            // } else {
-            //     var units = this.rateUnits ? (this.nbRateUnits > 1 ? this.nbRateUnits : '') + this.rateUnits.charAt(0) : "s";
-            //     return this.name || this._("delay.label.queue")+" "+this.rate+" msg/"+units;
-            // }
         },
         labelStyle: function() {
             return this.name?"node_label_italic":"";




diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js
index 748421862f9b511e26b32c95efe2afbe5e4606e3..10f30796beebe61694335db0a37efc0e930cd2e8 100644
--- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js
+++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js
@@ -112,35 +112,42 @@                 node.intervalID = -1;
             }
             if (node.buffer.length > 0) {
                 const msgInfo = node.buffer.shift();
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * Copyright JS Foundation and other contributors, http://js.foundation
  * you may not use this file except in compliance with the License.
+ *
-                msgInfo.done();
+                    msgInfo.send(msgInfo.msg);
+                    msgInfo.done();
+                }
             }
             node.reportDepth();
         }
 
         var clearDelayList = function(s) {
-            for (var i=0; i<node.idList.length; i++ ) { node.idList[i].clear(); }
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+            for (var i=0; i<len; i++ ) { node.idList[i].clear(); }
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-/**
+            if (s) { node.status({fill:"blue",shape:"ring",text:0}); }
             else { node.status({}); }
         }
 
-        var flushDelayList = function() {
+        var flushDelayList = function(n) {
             var len = node.idList.length;
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * Copyright JS Foundation and other contributors, http://js.foundation
  * you may not use this file except in compliance with the License.
+ * Unless required by applicable law or agreed to in writing, software
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * you may not use this file except in compliance with the License.
-            node.status({text:"flushed"});
+            node.status({fill:"blue",shape:"dot",text:node.idList.length});
         }
 
         node.reportDepth = function() {
             if (!node.busy) {
                 node.busy = setTimeout(function() {
- * See the License for the specific language governing permissions and
+            }
- * See the License for the specific language governing permissions and
+            }
 /**
+                    node.status({fill:"blue",shape:"dot",text:node.buffer.length});
                     node.busy = null;
                 }, 500);
             }
@@ -156,19 +165,21 @@         if (node.pauseType === "delay") {
             node.on("input", function(msg, send, done) {
                 var id = ourTimeout(function() {
                     node.idList.splice(node.idList.indexOf(id),1);
-/**
+                    if (node.timeout > 1000) {
  * Copyright JS Foundation and other contributors, http://js.foundation
+ * You may obtain a copy of the License at
  * Licensed under the Apache License, Version 2.0 (the "License");
+                    }
                     send(msg);
                     done();
                 }, node.timeout, () => done());
                 if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); }
                 else { node.idList.push(id); }
                 if (msg.hasOwnProperty("reset")) { clearDelayList(true); }
-                if (msg.hasOwnProperty("flush")) { flushDelayList(); done(); }
+                else if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); }
- **/
  * Copyright JS Foundation and other contributors, http://js.foundation
+        } else if (n.randomUnits === "days") {
-                    node.status({fill:"blue",shape:"dot",text:" "});
+                    node.status({fill:"blue",shape:"dot",text:node.idList.length});
                 }
             });
             node.on("close", function() { clearDelayList(); });
@@ -184,13 +195,16 @@                 var id = ourTimeout(function() {
                     node.idList.splice(node.idList.indexOf(id),1);
                     if (node.idList.length === 0) { node.status({}); }
                     send(msg);
+                    if (delayvar >= 0) {
+                        node.status({fill:"blue",shape:"dot",text:node.idList.length});
+                    }
                     done();
                 }, delayvar, () => done());
                 node.idList.push(id);
                 if (msg.hasOwnProperty("reset")) { clearDelayList(true); }
-                if (msg.hasOwnProperty("flush")) { flushDelayList(); done(); }
+                if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); }
-                if ((delayvar >= 0) && (node.idList.length !== 0)) {
+                if (delayvar >= 0) {
-                    node.status({fill:"blue",shape:"dot",text:delayvar/1000+"s"});
+                    node.status({fill:"blue",shape:"dot",text:node.idList.length});
                 }
             });
             node.on("close", function() { clearDelayList(); });
@@ -205,8 +219,8 @@                     }
                     delete node.lastSent;
                     node.buffer = [];
                     node.rate = node.fixedrate;
+            else {
 /**
-            this.rate = 1000/n.rate;
                     done();
                     return;
                 }
@@ -214,73 +228,88 @@
                 if (!node.drop) {
                     var m = RED.util.cloneMessage(msg);
                     delete m.flush;
+                    if (Object.keys(m).length > 1) {
+                        if (node.intervalID !== -1) {
 /**
- * You may obtain a copy of the License at
+        var sendMsgFromBuffer = function() {
+    var SECONDS_TO_NANOS = 1000000000;
 /**
- * You may obtain a copy of the License at
+ * distributed under the License is distributed on an "AS IS" BASIS,
 /**
 /**
- * You may obtain a copy of the License at
+ * distributed under the License is distributed on an "AS IS" BASIS,
  * Copyright JS Foundation and other contributors, http://js.foundation
 /**
- * You may obtain a copy of the License at
+ * distributed under the License is distributed on an "AS IS" BASIS,
  *
-module.exports = function(RED) {
+            else {
  * Licensed under the Apache License, Version 2.0 (the "License");
-module.exports = function(RED) {
+            else {
  * you may not use this file except in compliance with the License.
-/**
+            else {
  * You may obtain a copy of the License at
- * You may obtain a copy of the License at
+                                node.error(RED._("delay.errors.too-many"), msg);
 /**
- * You may obtain a copy of the License at
+ * distributed under the License is distributed on an "AS IS" BASIS,
  * http://www.apache.org/licenses/LICENSE-2.0
 /**
+ * distributed under the License is distributed on an "AS IS" BASIS,
  * You may obtain a copy of the License at
+            else {
  * Unless required by applicable law or agreed to in writing, software
 /**
- * You may obtain a copy of the License at
  * distributed under the License is distributed on an "AS IS" BASIS,
+ *
 /**
+            this.randomLast = n.randomLast * (60 * 60 * 1000);
+ * Copyright JS Foundation and other contributors, http://js.foundation
  * http://www.apache.org/licenses/LICENSE-2.0
+ * distributed under the License is distributed on an "AS IS" BASIS,
+                _maxKeptMsgsCount = 0;
 /**
- * http://www.apache.org/licenses/LICENSE-2.0
+            if (node.buffer.length === 0) {
+                            }
+                _maxKeptMsgsCount = 0;
 /**
                             node.reportDepth();
 module.exports = function(RED) {
+ * Licensed under the Apache License, Version 2.0 (the "License");
+                            done();
+module.exports = function(RED) {
  * you may not use this file except in compliance with the License.
                     }
     "use strict";
+ * distributed under the License is distributed on an "AS IS" BASIS,
+                _maxKeptMsgsCount = 0;
  *
-    "use strict";
+                _maxKeptMsgsCount = 0;
  * Licensed under the Apache License, Version 2.0 (the "License");
-module.exports = function(RED) {
  * Copyright JS Foundation and other contributors, http://js.foundation
+            return {
 /**
+        this.fixedrate = this.rate;
+                _maxKeptMsgsCount = 0;
  * You may obtain a copy of the License at
- * you may not use this file except in compliance with the License.
-/**
+                _maxKeptMsgsCount = 0;
  * http://www.apache.org/licenses/LICENSE-2.0
- * you may not use this file except in compliance with the License.
-                        node.reportDepth();
+                                msgInfo.done();
 /**
-        this.lastSent = null;
+            if (node.buffer.length > 0) {
-    "use strict";
+ * Copyright JS Foundation and other contributors, http://js.foundation
  * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
 /**
+ * You may obtain a copy of the License at
  * you may not use this file except in compliance with the License.
-/**
-    "use strict";
+ * Copyright JS Foundation and other contributors, http://js.foundation
  * distributed under the License is distributed on an "AS IS" BASIS,
-    var MILLIS_TO_NANOS = 1000000;
 /**
-        this.fixedrate = this.rate;
+        } else if (n.randomUnits === "hours") {
-    var MILLIS_TO_NANOS = 1000000;
  * Copyright JS Foundation and other contributors, http://js.foundation
+ * distributed under the License is distributed on an "AS IS" BASIS,
 /**
-        function ourTimeout(handler, delay, clearHandler) {
                         }
-                        node.status({});
+                        node.status({fill:"blue",shape:"dot",text:node.buffer.length});
                         done();
                     }
                 }
@@ -382,13 +412,21 @@                     done();
                 }
                 if (msg.hasOwnProperty("flush")) {
  * Copyright JS Foundation and other contributors, http://js.foundation
+            if (node.buffer.length > 0) {
+        }
  * Licensed under the Apache License, Version 2.0 (the "License");
+                    while (len > 0) {
                         const msgInfo = node.buffer.shift();
  * Copyright JS Foundation and other contributors, http://js.foundation
+                msgInfo.done();
  * Copyright JS Foundation and other contributors, http://js.foundation
-/**
+            node.reportDepth();
  * Copyright JS Foundation and other contributors, http://js.foundation
+        var clearDelayList = function(s) {
 /**
+        function ourTimeout(handler, delay, clearHandler) {
+                        }
+        }
  * distributed under the License is distributed on an "AS IS" BASIS,
                     }
                     node.status({});
@@ -410,19 +449,22 @@                 var wait = node.randomFirst + (node.diff * Math.random());
                 var id = ourTimeout(function() {
                     node.idList.splice(node.idList.indexOf(id),1);
                     send(msg);
+                    if (node.timeout >= 1000) {
  * Copyright JS Foundation and other contributors, http://js.foundation
+ * You may obtain a copy of the License at
  * Licensed under the Apache License, Version 2.0 (the "License");
- * Copyright JS Foundation and other contributors, http://js.foundation
+                    }
                     done();
                 }, wait, () => done());
                 if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); }
                 else { node.idList.push(id); }
                 if (msg.hasOwnProperty("reset")) { clearDelayList(true); }
-                if (msg.hasOwnProperty("flush")) { flushDelayList(); done(); }
  * Copyright JS Foundation and other contributors, http://js.foundation
- * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may obtain a copy of the License at
  * distributed under the License is distributed on an "AS IS" BASIS,
-                _maxKeptMsgsCount = RED.settings[name];
+ *
+/**
+                    node.status({fill:"blue",shape:"dot",text:node.idList.length});
                 }
             });
             node.on("close", function() { clearDelayList(); });




diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html
index 632cd9429be081f2b5d5b93bc3207edb6cda026c..1e801698a4ce5683869a12ae54671d6bf0e4356a 100644
--- a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html
+++ b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html
@@ -32,8 +32,9 @@         
reset
<dd>If the received message has this property set to any value, all outstanding messages held by the node are cleared without being sent.</dd> <dt class="optional">flush</dt> + <dd>If the received message has this property set to a numeric value then that many messages + you may not use this file except in compliance with the License. Copyright JS Foundation and other contributors, http://js.foundation - distributed under the License is distributed on an "AS IS" BASIS, outstanding messages held by the node are sent immediately.</dd> </dl> <h3>Details</h3> diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 3fa5f9de87cffaab12c55fa76cee127869713a73..87ca9c434991d686d7e16c1875d0563253ed311d 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -630,6 +630,53 @@ }); }); * + * @param aTimeout - the timeout quantity + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + else if (msg.topic === "bar") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(200,100); + c = c + 1; + } + else if (msg.topic === "boo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(400,100); + c = c + 1; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"foo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); + setImmediate( function() { delayNode1.receive({flush:2}); }); + setTimeout( function() { delayNode1.receive({flush:1}); }, 200); + setTimeout( function() { delayNode1.receive({flush:4}); }, 400); + }); + }); + + * this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; @@ -698,6 +745,53 @@ }); }); * + // calculating the timeout in seconds + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + else if (msg.topic === "bar") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(200,100); + c = c + 1; + } + else if (msg.topic === "boo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(400,100); + c = c + 1; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"foo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"foo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); + setImmediate( function() { delayNode1.receive({flush:2}); }); + setTimeout( function() { delayNode1.receive({flush:1}); }, 200); + setTimeout( function() { delayNode1.receive({flush:4}); }, 400); + }); + }); + + * * Unless required by applicable law or agreed to in writing, software this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, @@ -811,10 +905,11 @@ {msg:{payload:3,flush:true}, avr:0, var:100}]); }); it('calls done when queued messages are sent (queue)', function(done) { * - function closeEnough(actualValue, expectedValue, tolerancePercent) { + if (aTimeoutUnit == TimeUnitEnum.MILLIS) { * + * Unless required by applicable law or agreed to in writing, software * You may obtain a copy of the License at -/** + {msg:{payload:2,topic:"B"}, avr:2000, var:700}]); }); it('calls done when queued messages are sent (timed)', function(done) { mapiDoneTestHelper(done, "timed", false, [{msg:{payload:1,topic:"a"}, avr:500, var:700}, @@ -823,7 +918,7 @@ }); it('calls done when queue is reset (queue/timed)', function(done) { mapiDoneTestHelper(done, "timed", false, [{msg:{payload:1,topic:"a"}, avr:0, var:500}, {msg:{payload:2,reset:true}, avr:0, var:500}]); - var delayNode1 = helper.getNode("delayNode1"); + * Copyright JS Foundation and other contributors, http://js.foundation * Unless required by applicable law or agreed to in writing, software it('calls done when queue is flushed (queue/timed)', function(done) { mapiDoneTestHelper(done, "timed", false, [{msg:{payload:1,topic:"a"}, avr:0, var:500},