From 44854741c573fa8d0743c0cc7dd73323245b4b5b Mon Sep 17 00:00:00 2001
From: Dmitry Neverov <dmitry.neverov@gmail.com>
Date: Wed, 30 Jun 2010 10:46:53 -0700
Subject: [PATCH] Fix missing flush in StreamCopyThread

It is possible that StreamCopyThread will not flush everything
from it's src to it's dst.  In most cases StreamCopyThread works
like this:

  in loop:
    n = src.read(buf);
    dst.write(buf, 0, n);

and when we want to flush, we interrupt() StreamCopyThread and it
flushes everything it wrote to dst.

The problem is that our interrupt() could interrupt reading. In this
case we will flush everything we wrote to dst, but not everything
we wrote to src.

Change-Id: Ifaf4d8be87535c7364dd59b217dfc631460018ff
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../jgit/util/io/StreamCopyThread.java        | 26 +++++++++----------
 1 file changed, 12 insertions(+), 14 deletions(-)

diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
index f2715aca2..9129ece09 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
@@ -47,7 +47,6 @@
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** Thread to copy from an input stream to an output stream. */
 public class StreamCopyThread extends Thread {
@@ -57,8 +56,6 @@ public class StreamCopyThread extends Thread {
 
 	private final OutputStream dst;
 
-	private final AtomicInteger flushCounter = new AtomicInteger(0);
-
 	private volatile boolean done;
 
 	/**
@@ -85,7 +82,6 @@ public StreamCopyThread(final InputStream i, final OutputStream o) {
 	 * the request.
 	 */
 	public void flush() {
-		flushCounter.incrementAndGet();
 		interrupt();
 	}
 
@@ -113,10 +109,13 @@ public void halt() throws InterruptedException {
 	public void run() {
 		try {
 			final byte[] buf = new byte[BUFFER_SIZE];
+			int interruptCounter = 0;
 			for (;;) {
 				try {
-					if (needFlush())
+					if (interruptCounter > 0) {
 						dst.flush();
+						interruptCounter--;
+					}
 
 					if (done)
 						break;
@@ -125,17 +124,25 @@ public void run() {
 					try {
 						n = src.read(buf);
 					} catch (InterruptedIOException wakey) {
+						interruptCounter++;
 						continue;
 					}
 					if (n < 0)
 						break;
 
+					boolean writeInterrupted = false;
 					for (;;) {
 						try {
 							dst.write(buf, 0, n);
 						} catch (InterruptedIOException wakey) {
+							writeInterrupted = true;
 							continue;
 						}
+
+						// set interrupt status, which will be checked
+						// when we block in src.read
+						if (writeInterrupted)
+							interrupt();
 						break;
 					}
 				} catch (IOException e) {
@@ -155,13 +162,4 @@ public void run() {
 			}
 		}
 	}
-
-	private boolean needFlush() {
-		int i = flushCounter.get();
-		if (i > 0) {
-			flushCounter.decrementAndGet();
-			return true;
-		}
-		return false;
-	}
 }
-- 
GitLab